diff --git a/doc/api/stream.md b/doc/api/stream.md index c8b9e4400e423e..f4549fb7445313 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2118,6 +2118,49 @@ import { Readable } from 'stream'; await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] ``` +### `readable.reduce(fn[, initial[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a reducer function to call over every chunk + in the stream. + * `previous` {any} the value obtained from the last call to `fn` or the + `initial` value if specified or the first chunk of the stream otherwise. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `initial` {any} the initial value to use in the reduction. +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise for the final value of the reduction. + +This method calls `fn` on each chunk of the stream in order, passing it the +result from the calculation on the previous element. It returns a promise for +the final value of the reduction. + +The reducer function iterates the stream element-by-element which means that +there is no `concurrency` parameter or parallism. To perform a `reduce` +concurrently, it can be chained to the [`readable.map`][] method. + +If no `initial` value is supplied the first chunk of the stream is used as the +initial value. If the stream is empty, the promise is rejected with a +`TypeError` with the `ERR_INVALID_ARGS` code property. + +```mjs +import { Readable } from 'stream'; + +const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => { + return previous + data; +}); +console.log(ten); // 10 +``` + ### Duplex and transform streams #### Class: `stream.Duplex` @@ -4193,6 +4236,7 @@ contain multi-byte characters. [`process.stdin`]: process.md#processstdin [`process.stdout`]: process.md#processstdout [`readable._read()`]: #readable_readsize +[`readable.map`]: #readablemapfn-options [`readable.push('')`]: #readablepush [`readable.setEncoding()`]: #readablesetencodingencoding [`stream.Readable.from()`]: #streamreadablefromiterable-options diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index a1190fabcf8c00..947728bd788455 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -6,6 +6,7 @@ const { codes: { ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, + ERR_MISSING_ARGS, }, AbortError, } = require('internal/errors'); @@ -186,8 +187,8 @@ async function every(fn, options) { 'fn', ['Function', 'AsyncFunction'], fn); } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws - return !(await some.call(this, async (x) => { - return !(await fn(x)); + return !(await some.call(this, async (...args) => { + return !(await fn(...args)); }, options)); } @@ -218,11 +219,57 @@ async function * filter(fn, options) { yield* this.map(filterFn, options); } +// Specific to provide better error to reduce since the argument is only +// missing if the stream has no items in it - but the code is still appropriate +class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { + constructor() { + super('reduce'); + this.message = 'Reduce of an empty stream requires an initial value'; + } +} + +async function reduce(reducer, initialValue, options) { + if (typeof reducer !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'reducer', ['Function', 'AsyncFunction'], reducer); + } + let hasInitialValue = arguments.length > 1; + if (options?.signal?.aborted) { + const err = new AbortError(undefined, { cause: options.signal.reason }); + this.once('error', () => {}); // The error is already propagated + this.destroy(err); + throw err; + } + const ac = new AbortController(); + const signal = ac.signal; + let gotAnyItemFromStream = false; + try { + for await (const value of this) { + gotAnyItemFromStream = true; + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (!hasInitialValue) { + initialValue = value; + hasInitialValue = true; + } else { + initialValue = await reducer(initialValue, value, { signal }); + } + } + if (!gotAnyItemFromStream && !hasInitialValue) { + throw new ReduceAwareErrMissingArgs(); + } + } finally { + ac.abort(); + } + return initialValue; +} + async function toArray(options) { const result = []; for await (const val of this) { if (options?.signal?.aborted) { - throw new AbortError({ cause: options.signal.reason }); + throw new AbortError(undefined, { cause: options.signal.reason }); } ArrayPrototypePush(result, val); } @@ -296,6 +343,7 @@ module.exports.streamReturningOperators = { module.exports.promiseReturningOperators = { every, forEach, + reduce, toArray, some, }; diff --git a/test/parallel/test-stream-reduce.js b/test/parallel/test-stream-reduce.js new file mode 100644 index 00000000000000..526f56acafc89e --- /dev/null +++ b/test/parallel/test-stream-reduce.js @@ -0,0 +1,112 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); + +function sum(p, c) { + return p + c; +} + +{ + // Does the same thing as `(await stream.toArray()).reduce(...)` + (async () => { + const tests = [ + [[], sum, 0], + [[1], sum, 0], + [[1, 2, 3, 4, 5], sum, 0], + [Array(100).fill().map((_, i) => i), sum, 0], + [['a', 'b', 'c'], sum, ''], + [[1, 2], sum], + [[1, 2, 3], (x, y) => y], + ]; + for (const [values, fn, initial] of tests) { + const streamReduce = await Readable.from(values) + .reduce(fn, initial); + const arrayReduce = values.reduce(fn, initial); + assert.deepStrictEqual(streamReduce, arrayReduce); + } + // Does the same thing as `(await stream.toArray()).reduce(...)` with an + // asynchronous reducer + for (const [values, fn, initial] of tests) { + const streamReduce = await Readable.from(values) + .map(async (x) => x) + .reduce(fn, initial); + const arrayReduce = values.reduce(fn, initial); + assert.deepStrictEqual(streamReduce, arrayReduce); + } + })().then(common.mustCall()); +} +{ + // Works with an async reducer, with or without initial value + (async () => { + const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0); + assert.strictEqual(six, 6); + })().then(common.mustCall()); + (async () => { + const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c); + assert.strictEqual(six, 6); + })().then(common.mustCall()); +} +{ + // Works lazily + assert.rejects(Readable.from([1, 2, 3, 4, 5, 6]) + .map(common.mustCall((x) => { + return x; + }, 3)) // Two consumed and one buffered by `map` due to default concurrency + .reduce(async (p, c) => { + if (p === 1) { + throw new Error('boom'); + } + return c; + }, 0) + , /boom/).then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + assert.rejects(async () => { + await Readable.from([1, 2, 3]).reduce(async (p, c) => { + if (c === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(); + }, 0, { signal: ac.signal }); + }, { + name: 'AbortError', + }).then(common.mustCall()); + ac.abort(); +} + + +{ + // Support for AbortSignal - pre aborted + const stream = Readable.from([1, 2, 3]); + assert.rejects(async () => { + await stream.reduce(async (p, c) => { + if (c === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(); + }, 0, { signal: AbortSignal.abort() }); + }, { + name: 'AbortError', + }).then(common.mustCall(() => { + assert.strictEqual(stream.destroyed, true); + })); +} + +{ + // Error cases + assert.rejects(() => Readable.from([]).reduce(1), /TypeError/); + assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/); +} + +{ + // Test result is a Promise + const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0); + assert.ok(result instanceof Promise); +}