From 5c59528def9611406f7ad000d5f20fd598716f58 Mon Sep 17 00:00:00 2001 From: Richie Bendall Date: Wed, 6 Dec 2023 01:19:14 +1300 Subject: [PATCH] Add `pMapIterable` export (#63) --- assert-in-range.js | 10 ++ index.d.ts | 36 +++++- index.js | 105 +++++++++++++++++- index.test-d.ts | 4 +- package.json | 1 + readme.md | 29 +++++ test-multiple-pmapskips-performance.js | 4 +- test.js | 146 ++++++++++++++++++++++++- 8 files changed, 324 insertions(+), 11 deletions(-) create mode 100644 assert-in-range.js diff --git a/assert-in-range.js b/assert-in-range.js new file mode 100644 index 0000000..7e73c85 --- /dev/null +++ b/assert-in-range.js @@ -0,0 +1,10 @@ +import chalk from 'chalk'; +import inRange from 'in-range'; + +export default function assertInRange(t, value, {start = 0, end}) { + if (inRange(value, {start, end})) { + t.pass(); + } else { + t.fail(`${start} ${start <= value ? '≤' : chalk.red('≰')} ${chalk.yellow(value)} ${value <= end ? '≤' : chalk.red('≰')} ${end}`); + } +} diff --git a/index.d.ts b/index.d.ts index e11b258..075093a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,4 +1,4 @@ -export type Options = { +type BaseOptions = { /** Number of concurrently pending promises returned by `mapper`. @@ -7,7 +7,9 @@ export type Options = { @default Infinity */ readonly concurrency?: number; +}; +export type Options = BaseOptions & { /** When `true`, the first mapper rejection will be rejected back to the consumer. @@ -42,6 +44,17 @@ export type Options = { readonly signal?: AbortSignal; }; +export type IterableOptions = BaseOptions & { + /** + Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure. + + Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database. + + Default: `options.concurrency` + */ + readonly backpressure?: number; +}; + type MaybePromise = T | Promise; /** @@ -88,6 +101,27 @@ export default function pMap( options?: Options ): Promise>>; +/** +@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. +@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value. +@returns An async iterable that streams each return value from `mapper` in order. + +@example +``` +import {pMapIterable} from 'p-map'; + +// Multiple posts are fetched concurrently, with limited concurrency and backpressure +for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) { + console.log(post); +}; +``` +*/ +export function pMapIterable( + input: AsyncIterable> | Iterable>, + mapper: Mapper, + options?: IterableOptions +): AsyncIterable>; + /** Return this value from a `mapper` function to skip including the value in the returned array. diff --git a/index.js b/index.js index 30290b9..5cc265b 100644 --- a/index.js +++ b/index.js @@ -16,7 +16,7 @@ export default async function pMap( throw new TypeError('Mapper function is required'); } - if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) { + if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) { throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`); } @@ -162,4 +162,107 @@ export default async function pMap( }); } +export function pMapIterable( + iterable, + mapper, + { + concurrency = Number.POSITIVE_INFINITY, + backpressure = concurrency, + } = {}, +) { + if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { + throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`); + } + + if (typeof mapper !== 'function') { + throw new TypeError('Mapper function is required'); + } + + if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) { + throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`); + } + + if (!((Number.isSafeInteger(backpressure) && backpressure >= concurrency) || backpressure === Number.POSITIVE_INFINITY)) { + throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`); + } + + return { + async * [Symbol.asyncIterator]() { + const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); + + const promises = []; + let runningMappersCount = 0; + let isDone = false; + + function trySpawn() { + if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { + return; + } + + const promise = (async () => { + const {done, value} = await iterator.next(); + + if (done) { + return {done: true}; + } + + runningMappersCount++; + + // Spawn if still below concurrency and backpressure limit + trySpawn(); + + try { + const returnValue = await mapper(value); + + runningMappersCount--; + + if (returnValue === pMapSkip) { + const index = promises.indexOf(promise); + + if (index > 0) { + promises.splice(index, 1); + } + } + + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); + + return {done: false, value: returnValue}; + } catch (error) { + isDone = true; + return {error}; + } + })(); + + promises.push(promise); + } + + trySpawn(); + + while (promises.length > 0) { + const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop + + promises.shift(); + + if (error) { + throw error; + } + + if (done) { + return; + } + + // Spawn if just dropped below backpressure limit and below the concurrency limit + trySpawn(); + + if (value === pMapSkip) { + continue; + } + + yield value; + } + }, + }; +} + export const pMapSkip = Symbol('skip'); diff --git a/index.test-d.ts b/index.test-d.ts index 81b560f..58acea4 100644 --- a/index.test-d.ts +++ b/index.test-d.ts @@ -1,5 +1,5 @@ import {expectType, expectAssignable} from 'tsd'; -import pMap, {type Options, type Mapper, pMapSkip} from './index.js'; +import pMap, {pMapIterable, type Options, type Mapper, pMapSkip} from './index.js'; const sites = [ 'https://sindresorhus.com', @@ -48,3 +48,5 @@ expectType>(pMap(numbers, (number: number) => { return pMapSkip; })); + +expectType>(pMapIterable(sites, asyncMapper)); diff --git a/package.json b/package.json index c2f2aaa..565c5fc 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ ], "devDependencies": { "ava": "^5.2.0", + "chalk": "^5.3.0", "delay": "^5.0.0", "in-range": "^3.0.0", "random-int": "^3.0.0", diff --git a/readme.md b/readme.md index 50cd4d9..9a13178 100644 --- a/readme.md +++ b/readme.md @@ -41,6 +41,19 @@ console.log(result); Returns a `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order. +### pMapIterable(input, mapper, options?) + +Returns an async iterable that streams each return value from `mapper` in order. + +```js +import {pMapIterable} from 'p-map'; + +// Multiple posts are fetched concurrently, with limited concurrency and backpressure +for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) { + console.log(post); +}; +``` + #### input Type: `AsyncIterable | unknown> | Iterable | unknown>` @@ -67,8 +80,22 @@ Minimum: `1` Number of concurrently pending promises returned by `mapper`. +##### backpressure + +**Only for `pMapInterable`** + +Type: `number` *(Integer)*\ +Default: `options.concurrency`\ +Minimum: `options.concurrency` + +Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure. + +Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database. + ##### stopOnError +**Only for `pMap`** + Type: `boolean`\ Default: `true` @@ -80,6 +107,8 @@ Caveat: When `true`, any already-started async mappers will continue to run unti ##### signal +**Only for `pMap`** + Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) You can abort the promises using [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController). diff --git a/test-multiple-pmapskips-performance.js b/test-multiple-pmapskips-performance.js index a70ff30..b3953be 100644 --- a/test-multiple-pmapskips-performance.js +++ b/test-multiple-pmapskips-performance.js @@ -1,6 +1,6 @@ import test from 'ava'; -import inRange from 'in-range'; import timeSpan from 'time-span'; +import assertInRange from './assert-in-range.js'; import pMap, {pMapSkip} from './index.js'; function generateSkipPerformanceData(length) { @@ -32,6 +32,6 @@ test('multiple pMapSkips - algorithmic complexity', async t => { // shorter test. This is not perfect... there is some fluctuation. // The idea here is to catch a regression that makes `pMapSkip` handling O(n^2) // on the number of `pMapSkip` items in the input. - t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration})); + assertInRange(t, longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}); } }); diff --git a/test.js b/test.js index 5eec014..03d7d59 100644 --- a/test.js +++ b/test.js @@ -1,9 +1,9 @@ import test from 'ava'; import delay from 'delay'; -import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; -import pMap, {pMapSkip} from './index.js'; +import assertInRange from './assert-in-range.js'; +import pMap, {pMapIterable, pMapSkip} from './index.js'; const sharedInput = [ [async () => 10, 300], @@ -11,6 +11,14 @@ const sharedInput = [ [30, 100], ]; +const longerSharedInput = [ + [10, 300], + [20, 200], + [30, 100], + [40, 50], + [50, 25], +]; + const errorInput1 = [ [20, 200], [30, 100], @@ -33,6 +41,14 @@ const errorInput2 = [ }, 10], ]; +const errorInput3 = [ + [20, 10], + [async () => { + throw new Error('bar'); + }, 100], + [30, 100], +]; + const mapper = async ([value, ms]) => { await delay(ms); @@ -81,13 +97,13 @@ test('main', async t => { t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]); // We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload. - t.true(inRange(end(), {start: 290, end: 430})); + assertInRange(t, end(), {start: 290, end: 430}); }); test('concurrency: 1', async t => { const end = timeSpan(); t.deepEqual(await pMap(sharedInput, mapper, {concurrency: 1}), [10, 20, 30]); - t.true(inRange(end(), {start: 590, end: 760})); + assertInRange(t, end(), {start: 590, end: 760}); }); test('concurrency: 4', async t => { @@ -217,13 +233,13 @@ test('asyncIterator - main', async t => { t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]); // We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload. - t.true(inRange(end(), {start: 290, end: 430})); + assertInRange(t, end(), {start: 290, end: 430}); }); test('asyncIterator - concurrency: 1', async t => { const end = timeSpan(); t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]); - t.true(inRange(end(), {start: 590, end: 760})); + assertInRange(t, end(), {start: 590, end: 760}); }); test('asyncIterator - concurrency: 4', async t => { @@ -485,3 +501,121 @@ if (globalThis.AbortController !== undefined) { }); }); } + +async function collectAsyncIterable(asyncIterable) { + const values = []; + + for await (const value of asyncIterable) { + values.push(value); + } + + return values; +} + +test('pMapIterable', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper)), [10, 20, 30]); +}); + +test('pMapIterable - empty', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([], mapper)), []); +}); + +test('pMapIterable - iterable that throws', async t => { + let isFirstNextCall = true; + + const iterable = { + [Symbol.asyncIterator]() { + return { + async next() { + if (!isFirstNextCall) { + return {done: true}; + } + + isFirstNextCall = false; + throw new Error('foo'); + }, + }; + }, + }; + + const iterator = pMapIterable(iterable, mapper)[Symbol.asyncIterator](); + + await t.throwsAsync(iterator.next(), {message: 'foo'}); +}); + +test('pMapIterable - mapper that throws', async t => { + await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, async () => { + throw new Error('foo'); + })), {message: 'foo'}); +}); + +test('pMapIterable - stop on error', async t => { + const output = []; + + try { + for await (const value of pMapIterable(errorInput3, mapper)) { + output.push(value); + } + } catch (error) { + t.is(error.message, 'bar'); + } + + t.deepEqual(output, [20]); +}); + +test('pMapIterable - concurrency: 1', async t => { + const end = timeSpan(); + t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper, {concurrency: 1, backpressure: Number.POSITIVE_INFINITY})), [10, 20, 30]); + + // It could've only taken this much time if each were run in series + assertInRange(t, end(), {start: 590, end: 760}); +}); + +test('pMapIterable - concurrency: 2', async t => { + const times = new Map(); + const end = timeSpan(); + + t.deepEqual(await collectAsyncIterable(pMapIterable(longerSharedInput, value => { + times.set(value[0], end()); + return mapper(value); + }, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY})), [10, 20, 30, 40, 50]); + + assertInRange(t, times.get(10), {start: 0, end: 50}); + assertInRange(t, times.get(20), {start: 0, end: 50}); + assertInRange(t, times.get(30), {start: 200, end: 250}); + assertInRange(t, times.get(40), {start: 300, end: 350}); + assertInRange(t, times.get(50), {start: 300, end: 350}); +}); + +test('pMapIterable - backpressure', async t => { + let currentValue; + + // Concurrency option is forced by an early check + const asyncIterator = pMapIterable(longerSharedInput, async value => { + currentValue = await mapper(value); + return currentValue; + }, {backpressure: 2, concurrency: 2})[Symbol.asyncIterator](); + + const {value: value1} = await asyncIterator.next(); + t.is(value1, 10); + + // If backpressure is not respected, than all items will be evaluated in this time + await delay(600); + + t.is(currentValue, 30); + + const {value: value2} = await asyncIterator.next(); + t.is(value2, 20); + + await delay(100); + + t.is(currentValue, 40); +}); + +test('pMapIterable - pMapSkip', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([ + 1, + pMapSkip, + 2, + ], async value => value)), [1, 2]); +});