Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pMapIterable #63

Merged
merged 25 commits into from Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 35 additions & 1 deletion index.d.ts
@@ -1,4 +1,4 @@
export type Options = {
type BaseOptions = {
/**
Number of concurrently pending promises returned by `mapper`.

Expand All @@ -7,7 +7,9 @@ export type Options = {
@default Infinity
*/
readonly concurrency?: number;
};

export type Options = BaseOptions & {
/**
When `true`, the first mapper rejection will be rejected back to the consumer.

Expand Down Expand Up @@ -42,6 +44,17 @@ export type Options = {
readonly signal?: AbortSignal;
};

export type IterableOptions = BaseOptions & {
/**
Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.

Richienb marked this conversation as resolved.
Show resolved Hide resolved
Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.

Default: `options.concurrency`
*/
readonly backpressure?: number;
};

type MaybePromise<T> = T | Promise<T>;

/**
Expand Down Expand Up @@ -88,6 +101,27 @@ export default function pMap<Element, NewElement>(
options?: Options
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;

/**
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value.
@returns An async iterable that streams each return value from `mapper` in order.

@example
```
import {pMapIterable} from 'p-map';

// Multiple posts are fetched concurrently, with limited concurrency and backpressure
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
console.log(post);
};
```
*/
export function pMapIterable<Element, NewElement>(
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>,
mapper: Mapper<Element, NewElement>,
options?: IterableOptions
): AsyncIterable<Exclude<NewElement, typeof pMapSkip>>;

/**
Return this value from a `mapper` function to skip including the value in the returned array.

Expand Down
147 changes: 146 additions & 1 deletion index.js
Expand Up @@ -46,7 +46,7 @@ export default async function pMap(
throw new TypeError('Mapper function is required');
}

if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) {
if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
}

Expand Down Expand Up @@ -192,4 +192,149 @@ export default async function pMap(
});
}

export function pMapIterable(
iterable,
mapper,
{
concurrency = Number.POSITIVE_INFINITY,
backpressure = concurrency,
} = {},
) {
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
}

if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}

if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
}

if (!((Number.isSafeInteger(backpressure) && backpressure >= concurrency) || backpressure === Number.POSITIVE_INFINITY)) {
throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`);
}

return {
[Symbol.asyncIterator]() {
Richienb marked this conversation as resolved.
Show resolved Hide resolved
let isDone = false;
const pendingQueue = [];
const waitingQueue = [];
const valueQueue = [];
const valuePromises = [];

const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

function tryToFlushWaitingQueue() {
while (waitingQueue[0]) {
const result = waitingQueue.shift();

if (valuePromises.length > 0) {
const {resolve, reject} = valuePromises.shift();

if (result.done) {
resolve({done: true});
} else if (result.error) {
reject(result.error);
} else {
resolve({done: false, value: result.value});
}
} else {
valueQueue.push(result);
}
}
}

async function tryToContinue() {
while (pendingQueue.length < concurrency && valueQueue.length + waitingQueue.length + pendingQueue.length < backpressure && !isDone) {
try {
const {done, value} = await iterator.next(); // eslint-disable-line no-await-in-loop

if (done) {
isDone = true;
waitingQueue[pendingQueue.length] = {done: true};
tryToFlushWaitingQueue();

return;
}

const promise = (async () => {
try {
const result = await mapper(value);

const index = pendingQueue.indexOf(promise);

pendingQueue.splice(index, 1);
tryToContinue();

if (result === pMapSkip) {
waitingQueue.splice(index, 1);
} else {
waitingQueue[index] = {value: result};
}
} catch (error) {
const index = pendingQueue.indexOf(promise);

pendingQueue.splice(index);

waitingQueue[index] = {error};

isDone = true;
waitingQueue[index + 1] = {done: true};
} finally {
tryToFlushWaitingQueue();
}
})();

pendingQueue.push(promise);
} catch (error) {
waitingQueue[pendingQueue.length] = {error};

isDone = true;
waitingQueue[pendingQueue.length + 1] = {done: true};

tryToFlushWaitingQueue();
}
}
}

tryToContinue();

return {
async next() {
if (isDone && pendingQueue.length === 0 && waitingQueue.length === 0 && valueQueue.length === 0) {
return {done: true};
}

if (valueQueue.length > 0) {
const {done, value, error} = valueQueue.shift();

tryToContinue();

if (done) {
return {done: true};
}

if (error) {
throw error;
}

return {done: false, value};
}

return new Promise((resolve, reject) => {
valuePromises.push({resolve, reject});
});
},
async return() {
isDone = true;

return {done: true};
},
};
},
Richienb marked this conversation as resolved.
Show resolved Hide resolved
};
}

export const pMapSkip = Symbol('skip');
4 changes: 3 additions & 1 deletion index.test-d.ts
@@ -1,5 +1,5 @@
import {expectType, expectAssignable} from 'tsd';
import pMap, {type Options, type Mapper, pMapSkip} from './index.js';
import pMap, {pMapIterable, type Options, type Mapper, pMapSkip} from './index.js';

const sites = [
'https://sindresorhus.com',
Expand Down Expand Up @@ -48,3 +48,5 @@ expectType<Promise<number[]>>(pMap(numbers, (number: number) => {

return pMapSkip;
}));

expectType<AsyncIterable<string>>(pMapIterable(sites, asyncMapper));
29 changes: 29 additions & 0 deletions readme.md
Expand Up @@ -41,6 +41,19 @@ console.log(result);

Returns a `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order.

### pMapIterable(input, mapper, options?)

Returns an async iterable that streams each return value from `mapper` in order.

```js
import {pMapIterable} from 'p-map';

// Multiple posts are fetched concurrently, with limited concurrency and backpressure
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
console.log(post);
};
```

#### input

Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>`
Expand All @@ -67,8 +80,22 @@ Minimum: `1`

Number of concurrently pending promises returned by `mapper`.

##### backpressure

**Only for `pMapInterable`**

Type: `number` *(Integer)*\
Default: `options.concurrency`\
Minimum: `options.concurrency`

Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.

Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.

##### stopOnError

**Only for `pMap`**

Type: `boolean`\
Default: `true`

Expand All @@ -80,6 +107,8 @@ Caveat: When `true`, any already-started async mappers will continue to run unti

##### signal

**Only for `pMap`**

Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)

You can abort the promises using [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController).
Expand Down
71 changes: 70 additions & 1 deletion test.js
Expand Up @@ -3,7 +3,7 @@ import delay from 'delay';
import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import pMap, {pMapSkip} from './index.js';
import pMap, {pMapIterable, pMapSkip} from './index.js';

const sharedInput = [
[async () => 10, 300],
Expand Down Expand Up @@ -33,6 +33,14 @@ const errorInput2 = [
}, 10],
];

const errorInput3 = [
[20, 10],
[async () => {
throw new Error('bar');
}, 100],
[30, 100],
];

const mapper = async ([value, ms]) => {
await delay(ms);

Expand Down Expand Up @@ -485,3 +493,64 @@ if (globalThis.AbortController !== undefined) {
});
});
}

async function collectAsyncIterable(asyncIterable) {
const values = [];

for await (const value of asyncIterable) {
values.push(value);
}

return values;
}

test('pMapIterable', async t => {
t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper)), [10, 20, 30]);
});

test('pMapIterable - empty', async t => {
t.deepEqual(await collectAsyncIterable(pMapIterable([], mapper)), []);
});

test('pMapIterable - iterable that throws', async t => {
let isFirstNextCall = true;

const iterable = {
[Symbol.asyncIterator]() {
return {
async next() {
if (!isFirstNextCall) {
return {done: true};
}

isFirstNextCall = false;
throw new Error('foo');
},
};
},
};

const iterator = pMapIterable(iterable, mapper)[Symbol.asyncIterator]();

await t.throwsAsync(iterator.next(), {message: 'foo'});
});

test('pMapIterable - mapper that throws', async t => {
await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, async () => {
throw new Error('foo');
})), {message: 'foo'});
});

test('pMapIterable - stop on error', async t => {
const output = [];

try {
for await (const value of pMapIterable(errorInput3, mapper)) {
output.push(value);
}
} catch (error) {
t.is(error.message, 'bar');
}

t.deepEqual(output, [20]);
});
Richienb marked this conversation as resolved.
Show resolved Hide resolved