Skip to content

Commit

Permalink
Add stopOnError option (#16)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <sindresorhus@gmail.com>
  • Loading branch information
yaodingyd and sindresorhus committed Jul 13, 2019
1 parent e870776 commit 9f0b32f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 5 deletions.
7 changes: 7 additions & 0 deletions index.d.ts
Expand Up @@ -6,6 +6,13 @@ declare namespace pMap {
@default Infinity
*/
concurrency?: number;

/**
When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises.
@default true
*/
stopOnError?: boolean;
}

/**
Expand Down
23 changes: 18 additions & 5 deletions index.js
@@ -1,21 +1,24 @@
'use strict';
const AggregateError = require('aggregate-error');

const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
options = Object.assign({
concurrency: Infinity
concurrency: Infinity,
stopOnError: true
}, options);

if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}

const {concurrency} = options;
const {concurrency, stopOnError} = options;

if (!(typeof concurrency === 'number' && concurrency >= 1)) {
throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${concurrency}\` (${typeof concurrency})`);
}

const ret = [];
const errors = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
let isIterableDone = false;
Expand All @@ -35,7 +38,11 @@ const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
isIterableDone = true;

if (resolvingCount === 0) {
resolve(ret);
if (!stopOnError && errors.length !== 0) {
reject(new AggregateError(errors));
} else {
resolve(ret);
}
}

return;
Expand All @@ -52,8 +59,14 @@ const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
next();
},
error => {
isRejected = true;
reject(error);
if (stopOnError) {
isRejected = true;
reject(error);
} else {
errors.push(error);
resolvingCount--;
next();
}
}
);
};
Expand Down
1 change: 1 addition & 0 deletions index.test-d.ts
Expand Up @@ -21,6 +21,7 @@ expectType<Mapper<string, string | number>>(multiResultTypeMapper);

expectType<Options>({});
expectType<Options>({concurrency: 0});
expectType<Options>({stopOnError: false});

expectType<Promise<string[]>>(pMap(sites, asyncMapper));
expectType<Promise<string[]>>(pMap(sites, asyncMapper, {concurrency: 2}));
Expand Down
3 changes: 3 additions & 0 deletions package.json
Expand Up @@ -37,6 +37,9 @@
"parallel",
"bluebird"
],
"dependencies": {
"aggregate-error": "^2.0.0"
},
"devDependencies": {
"ava": "^1.4.1",
"delay": "^4.1.0",
Expand Down
7 changes: 7 additions & 0 deletions readme.md
Expand Up @@ -68,6 +68,13 @@ Minimum: `1`

Number of concurrently pending promises returned by `mapper`.

##### stopOnError

Type: `boolean`<br>
Default: `true`

When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises.


## Related

Expand Down
26 changes: 26 additions & 0 deletions test.js
Expand Up @@ -3,6 +3,7 @@ import delay from 'delay';
import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import AggregateError from 'aggregate-error';
import pMap from '.';

const input = [
Expand All @@ -11,6 +12,20 @@ const input = [
[30, 100]
];

const errorInput1 = [
[20, 200],
[30, 100],
Promise.reject(new Error('foo')),
Promise.reject(new Error('bar'))
];

const errorInput2 = [
[20, 200],
Promise.reject(new Error('bar')),
[30, 100],
Promise.reject(new Error('foo'))
];

const mapper = ([value, ms]) => delay(ms, {value});

test('main', async t => {
Expand Down Expand Up @@ -69,3 +84,14 @@ test('enforce number in options.concurrency', async t => {
await t.notThrowsAsync(pMap([], () => {}, {concurrency: 10}));
await t.notThrowsAsync(pMap([], () => {}, {concurrency: Infinity}));
});

test('immediately rejects when stopOnError is true', async t => {
await t.throwsAsync(pMap(errorInput1, mapper, {concurrency: 1}), 'foo');
await t.throwsAsync(pMap(errorInput2, mapper, {concurrency: 1}), 'bar');
});

test('aggregate errors when stopOnError is false', async t => {
await t.notThrowsAsync(pMap(input, mapper, {concurrency: 1, stopOnError: false}));
await t.throwsAsync(pMap(errorInput1, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/});
await t.throwsAsync(pMap(errorInput2, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/});
});

0 comments on commit 9f0b32f

Please sign in to comment.