Skip to content

Commit

Permalink
Add pMapIterable export (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
Richienb committed Dec 5, 2023
1 parent 136b08a commit 5c59528
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 11 deletions.
10 changes: 10 additions & 0 deletions assert-in-range.js
@@ -0,0 +1,10 @@
import chalk from 'chalk';
import inRange from 'in-range';

export default function assertInRange(t, value, {start = 0, end}) {
if (inRange(value, {start, end})) {
t.pass();
} else {
t.fail(`${start} ${start <= value ? '≤' : chalk.red('≰')} ${chalk.yellow(value)} ${value <= end ? '≤' : chalk.red('≰')} ${end}`);
}
}
36 changes: 35 additions & 1 deletion index.d.ts
@@ -1,4 +1,4 @@
export type Options = {
type BaseOptions = {
/**
Number of concurrently pending promises returned by `mapper`.
Expand All @@ -7,7 +7,9 @@ export type Options = {
@default Infinity
*/
readonly concurrency?: number;
};

export type Options = BaseOptions & {
/**
When `true`, the first mapper rejection will be rejected back to the consumer.
Expand Down Expand Up @@ -42,6 +44,17 @@ export type Options = {
readonly signal?: AbortSignal;
};

export type IterableOptions = BaseOptions & {
/**
Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.
Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.
Default: `options.concurrency`
*/
readonly backpressure?: number;
};

type MaybePromise<T> = T | Promise<T>;

/**
Expand Down Expand Up @@ -88,6 +101,27 @@ export default function pMap<Element, NewElement>(
options?: Options
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;

/**
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value.
@returns An async iterable that streams each return value from `mapper` in order.
@example
```
import {pMapIterable} from 'p-map';
// Multiple posts are fetched concurrently, with limited concurrency and backpressure
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
console.log(post);
};
```
*/
export function pMapIterable<Element, NewElement>(
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>,
mapper: Mapper<Element, NewElement>,
options?: IterableOptions
): AsyncIterable<Exclude<NewElement, typeof pMapSkip>>;

/**
Return this value from a `mapper` function to skip including the value in the returned array.
Expand Down
105 changes: 104 additions & 1 deletion index.js
Expand Up @@ -16,7 +16,7 @@ export default async function pMap(
throw new TypeError('Mapper function is required');
}

if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) {
if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
}

Expand Down Expand Up @@ -162,4 +162,107 @@ export default async function pMap(
});
}

export function pMapIterable(
iterable,
mapper,
{
concurrency = Number.POSITIVE_INFINITY,
backpressure = concurrency,
} = {},
) {
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
}

if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}

if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
}

if (!((Number.isSafeInteger(backpressure) && backpressure >= concurrency) || backpressure === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`);
}

return {
async * [Symbol.asyncIterator]() {
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

const promises = [];
let runningMappersCount = 0;
let isDone = false;

function trySpawn() {
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
return;
}

const promise = (async () => {
const {done, value} = await iterator.next();

if (done) {
return {done: true};
}

runningMappersCount++;

// Spawn if still below concurrency and backpressure limit
trySpawn();

try {
const returnValue = await mapper(value);

runningMappersCount--;

if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);

if (index > 0) {
promises.splice(index, 1);
}
}

// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();

return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
})();

promises.push(promise);
}

trySpawn();

while (promises.length > 0) {
const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop

promises.shift();

if (error) {
throw error;
}

if (done) {
return;
}

// Spawn if just dropped below backpressure limit and below the concurrency limit
trySpawn();

if (value === pMapSkip) {
continue;
}

yield value;
}
},
};
}

export const pMapSkip = Symbol('skip');
4 changes: 3 additions & 1 deletion index.test-d.ts
@@ -1,5 +1,5 @@
import {expectType, expectAssignable} from 'tsd';
import pMap, {type Options, type Mapper, pMapSkip} from './index.js';
import pMap, {pMapIterable, type Options, type Mapper, pMapSkip} from './index.js';

const sites = [
'https://sindresorhus.com',
Expand Down Expand Up @@ -48,3 +48,5 @@ expectType<Promise<number[]>>(pMap(numbers, (number: number) => {

return pMapSkip;
}));

expectType<AsyncIterable<string>>(pMapIterable(sites, asyncMapper));
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -42,6 +42,7 @@
],
"devDependencies": {
"ava": "^5.2.0",
"chalk": "^5.3.0",
"delay": "^5.0.0",
"in-range": "^3.0.0",
"random-int": "^3.0.0",
Expand Down
29 changes: 29 additions & 0 deletions readme.md
Expand Up @@ -41,6 +41,19 @@ console.log(result);

Returns a `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order.

### pMapIterable(input, mapper, options?)

Returns an async iterable that streams each return value from `mapper` in order.

```js
import {pMapIterable} from 'p-map';

// Multiple posts are fetched concurrently, with limited concurrency and backpressure
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
console.log(post);
};
```

#### input

Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>`
Expand All @@ -67,8 +80,22 @@ Minimum: `1`

Number of concurrently pending promises returned by `mapper`.

##### backpressure

**Only for `pMapInterable`**

Type: `number` *(Integer)*\
Default: `options.concurrency`\
Minimum: `options.concurrency`

Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.

Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.

##### stopOnError

**Only for `pMap`**

Type: `boolean`\
Default: `true`

Expand All @@ -80,6 +107,8 @@ Caveat: When `true`, any already-started async mappers will continue to run unti

##### signal

**Only for `pMap`**

Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)

You can abort the promises using [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController).
Expand Down
4 changes: 2 additions & 2 deletions test-multiple-pmapskips-performance.js
@@ -1,6 +1,6 @@
import test from 'ava';
import inRange from 'in-range';
import timeSpan from 'time-span';
import assertInRange from './assert-in-range.js';
import pMap, {pMapSkip} from './index.js';

function generateSkipPerformanceData(length) {
Expand Down Expand Up @@ -32,6 +32,6 @@ test('multiple pMapSkips - algorithmic complexity', async t => {
// shorter test. This is not perfect... there is some fluctuation.
// The idea here is to catch a regression that makes `pMapSkip` handling O(n^2)
// on the number of `pMapSkip` items in the input.
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
assertInRange(t, longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration});
}
});

0 comments on commit 5c59528

Please sign in to comment.