From 9f0b32febbf982630c7b1956feffa39986eae1f0 Mon Sep 17 00:00:00 2001 From: Yao Ding Date: Sat, 13 Jul 2019 11:03:41 -0400 Subject: [PATCH] Add `stopOnError` option (#16) Co-authored-by: Sindre Sorhus --- index.d.ts | 7 +++++++ index.js | 23 ++++++++++++++++++----- index.test-d.ts | 1 + package.json | 3 +++ readme.md | 7 +++++++ test.js | 26 ++++++++++++++++++++++++++ 6 files changed, 62 insertions(+), 5 deletions(-) diff --git a/index.d.ts b/index.d.ts index 49da529..90269fc 100644 --- a/index.d.ts +++ b/index.d.ts @@ -6,6 +6,13 @@ declare namespace pMap { @default Infinity */ concurrency?: number; + + /** + When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. + + @default true + */ + stopOnError?: boolean; } /** diff --git a/index.js b/index.js index 36f5c28..250c5e7 100644 --- a/index.js +++ b/index.js @@ -1,21 +1,24 @@ 'use strict'; +const AggregateError = require('aggregate-error'); const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => { options = Object.assign({ - concurrency: Infinity + concurrency: Infinity, + stopOnError: true }, options); if (typeof mapper !== 'function') { throw new TypeError('Mapper function is required'); } - const {concurrency} = options; + const {concurrency, stopOnError} = options; if (!(typeof concurrency === 'number' && concurrency >= 1)) { throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${concurrency}\` (${typeof concurrency})`); } const ret = []; + const errors = []; const iterator = iterable[Symbol.iterator](); let isRejected = false; let isIterableDone = false; @@ -35,7 +38,11 @@ const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => { isIterableDone = true; if (resolvingCount === 0) { - resolve(ret); + if (!stopOnError && errors.length !== 0) { + reject(new AggregateError(errors)); + } else { + resolve(ret); + } } return; @@ -52,8 +59,14 @@ const pMap = (iterable, mapper, options) => new Promise((resolve, reject) => { next(); }, error => { - isRejected = true; - reject(error); + if (stopOnError) { + isRejected = true; + reject(error); + } else { + errors.push(error); + resolvingCount--; + next(); + } } ); }; diff --git a/index.test-d.ts b/index.test-d.ts index c53d7c7..c08d0ce 100644 --- a/index.test-d.ts +++ b/index.test-d.ts @@ -21,6 +21,7 @@ expectType>(multiResultTypeMapper); expectType({}); expectType({concurrency: 0}); +expectType({stopOnError: false}); expectType>(pMap(sites, asyncMapper)); expectType>(pMap(sites, asyncMapper, {concurrency: 2})); diff --git a/package.json b/package.json index 60a866c..7b0d470 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,9 @@ "parallel", "bluebird" ], + "dependencies": { + "aggregate-error": "^2.0.0" + }, "devDependencies": { "ava": "^1.4.1", "delay": "^4.1.0", diff --git a/readme.md b/readme.md index 166f846..87e7838 100644 --- a/readme.md +++ b/readme.md @@ -68,6 +68,13 @@ Minimum: `1` Number of concurrently pending promises returned by `mapper`. +##### stopOnError + +Type: `boolean`
+Default: `true` + +When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. + ## Related diff --git a/test.js b/test.js index e941e9d..227b023 100644 --- a/test.js +++ b/test.js @@ -3,6 +3,7 @@ import delay from 'delay'; import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; +import AggregateError from 'aggregate-error'; import pMap from '.'; const input = [ @@ -11,6 +12,20 @@ const input = [ [30, 100] ]; +const errorInput1 = [ + [20, 200], + [30, 100], + Promise.reject(new Error('foo')), + Promise.reject(new Error('bar')) +]; + +const errorInput2 = [ + [20, 200], + Promise.reject(new Error('bar')), + [30, 100], + Promise.reject(new Error('foo')) +]; + const mapper = ([value, ms]) => delay(ms, {value}); test('main', async t => { @@ -69,3 +84,14 @@ test('enforce number in options.concurrency', async t => { await t.notThrowsAsync(pMap([], () => {}, {concurrency: 10})); await t.notThrowsAsync(pMap([], () => {}, {concurrency: Infinity})); }); + +test('immediately rejects when stopOnError is true', async t => { + await t.throwsAsync(pMap(errorInput1, mapper, {concurrency: 1}), 'foo'); + await t.throwsAsync(pMap(errorInput2, mapper, {concurrency: 1}), 'bar'); +}); + +test('aggregate errors when stopOnError is false', async t => { + await t.notThrowsAsync(pMap(input, mapper, {concurrency: 1, stopOnError: false})); + await t.throwsAsync(pMap(errorInput1, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/}); + await t.throwsAsync(pMap(errorInput2, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/}); +});