From f12cf6db47954f4b1f4d6351837a303cd1369f1e Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sun, 30 Jan 2022 16:07:32 +0200 Subject: [PATCH] stream: add reduce PR-URL: https://github.com/nodejs/node/pull/41669 Reviewed-By: Matteo Collina --- doc/api/stream.md | 44 +++++++++ lib/internal/streams/end-of-stream.js | 15 +++ lib/internal/streams/operators.js | 59 +++++++++++- lib/stream/promises.js | 14 +-- test/parallel/test-stream-reduce.js | 130 ++++++++++++++++++++++++++ 5 files changed, 246 insertions(+), 16 deletions(-) create mode 100644 test/parallel/test-stream-reduce.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 2d75842279f498..0b4154d9b01562 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2089,6 +2089,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray(); console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']] ``` +### `readable.reduce(fn[, initial[, options]])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a reducer function to call over every chunk + in the stream. + * `previous` {any} the value obtained from the last call to `fn` or the + `initial` value if specified or the first chunk of the stream otherwise. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `initial` {any} the initial value to use in the reduction. +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise for the final value of the reduction. + +This method calls `fn` on each chunk of the stream in order, passing it the +result from the calculation on the previous element. It returns a promise for +the final value of the reduction. + +The reducer function iterates the stream element-by-element which means that +there is no `concurrency` parameter or parallism. To perform a `reduce` +concurrently, it can be chained to the [`readable.map`][] method. + +If no `initial` value is supplied the first chunk of the stream is used as the +initial value. If the stream is empty, the promise is rejected with a +`TypeError` with the `ERR_INVALID_ARGS` code property. + +```mjs +import { Readable } from 'stream'; + +const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => { + return previous + data; +}); +console.log(ten); // 10 +``` + ### Duplex and transform streams #### Class: `stream.Duplex` @@ -4137,6 +4180,7 @@ contain multi-byte characters. [`process.stdin`]: process.md#processstdin [`process.stdout`]: process.md#processstdout [`readable._read()`]: #readable_readsize +[`readable.map`]: #readablemapfn-options [`readable.push('')`]: #readablepush [`readable.setEncoding()`]: #readablesetencodingencoding [`stream.Readable.from()`]: #streamreadablefromiterable-options diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 4702ddd6d53423..9edd6cc9defafb 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -17,6 +17,8 @@ const { validateObject, } = require('internal/validators'); +const { Promise } = primordials; + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -232,4 +234,17 @@ function eos(stream, options, callback) { return cleanup; } +function finished(stream, opts) { + return new Promise((resolve, reject) => { + eos(stream, opts, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + module.exports = eos; +module.exports.finished = finished; diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 0c55e9ad758fa0..42807edf1c7eca 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -6,12 +6,14 @@ const { Buffer } = require('buffer'); const { codes: { ERR_INVALID_ARG_TYPE, + ERR_MISSING_ARGS, ERR_OUT_OF_RANGE, }, AbortError, } = require('internal/errors'); const { validateInteger } = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); +const { finished } = require('internal/streams/end-of-stream'); const { ArrayPrototypePush, @@ -199,8 +201,8 @@ async function every(fn, options) { 'fn', ['Function', 'AsyncFunction'], fn); } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws - return !(await some.call(this, async (x) => { - return !(await fn(x)); + return !(await some.call(this, async (...args) => { + return !(await fn(...args)); }, options)); } @@ -231,11 +233,61 @@ function filter(fn, options) { return this.map(filterFn, options); } +// Specific to provide better error to reduce since the argument is only +// missing if the stream has no items in it - but the code is still appropriate +class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { + constructor() { + super('reduce'); + this.message = 'Reduce of an empty stream requires an initial value'; + } +} + +async function reduce(reducer, initialValue, options) { + if (typeof reducer !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'reducer', ['Function', 'AsyncFunction'], reducer); + } + let hasInitialValue = arguments.length > 1; + if (options?.signal?.aborted) { + const err = new AbortError(undefined, { cause: options.signal.reason }); + this.once('error', () => {}); // The error is already propagated + await finished(this.destroy(err)); + throw err; + } + const ac = new AbortController(); + const signal = ac.signal; + if (options?.signal) { + const opts = { once: true, [kWeakHandler]: this }; + options.signal.addEventListener('abort', () => ac.abort(), opts); + } + let gotAnyItemFromStream = false; + try { + for await (const value of this) { + gotAnyItemFromStream = true; + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (!hasInitialValue) { + initialValue = value; + hasInitialValue = true; + } else { + initialValue = await reducer(initialValue, value, { signal }); + } + } + if (!gotAnyItemFromStream && !hasInitialValue) { + throw new ReduceAwareErrMissingArgs(); + } + } finally { + ac.abort(); + } + return initialValue; +} + async function toArray(options) { const result = []; for await (const val of this) { if (options?.signal?.aborted) { - throw new AbortError({ cause: options.signal.reason }); + throw new AbortError(undefined, { cause: options.signal.reason }); } ArrayPrototypePush(result, val); } @@ -316,6 +368,7 @@ module.exports.streamReturningOperators = { module.exports.promiseReturningOperators = { every, forEach, + reduce, toArray, some, }; diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 2fdcad3cc4aa3a..7a896f87b14392 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -11,7 +11,7 @@ const { } = require('internal/streams/utils'); const { pipelineImpl: pl } = require('internal/streams/pipeline'); -const eos = require('internal/streams/end-of-stream'); +const { finished } = require('internal/streams/end-of-stream'); function pipeline(...streams) { return new Promise((resolve, reject) => { @@ -35,18 +35,6 @@ function pipeline(...streams) { }); } -function finished(stream, opts) { - return new Promise((resolve, reject) => { - eos(stream, opts, (err) => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - module.exports = { finished, pipeline, diff --git a/test/parallel/test-stream-reduce.js b/test/parallel/test-stream-reduce.js new file mode 100644 index 00000000000000..a8b41efa28415d --- /dev/null +++ b/test/parallel/test-stream-reduce.js @@ -0,0 +1,130 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); + +function sum(p, c) { + return p + c; +} + +{ + // Does the same thing as `(await stream.toArray()).reduce(...)` + (async () => { + const tests = [ + [[], sum, 0], + [[1], sum, 0], + [[1, 2, 3, 4, 5], sum, 0], + [[...Array(100).keys()], sum, 0], + [['a', 'b', 'c'], sum, ''], + [[1, 2], sum], + [[1, 2, 3], (x, y) => y], + ]; + for (const [values, fn, initial] of tests) { + const streamReduce = await Readable.from(values) + .reduce(fn, initial); + const arrayReduce = values.reduce(fn, initial); + assert.deepStrictEqual(streamReduce, arrayReduce); + } + // Does the same thing as `(await stream.toArray()).reduce(...)` with an + // asynchronous reducer + for (const [values, fn, initial] of tests) { + const streamReduce = await Readable.from(values) + .map(async (x) => x) + .reduce(fn, initial); + const arrayReduce = values.reduce(fn, initial); + assert.deepStrictEqual(streamReduce, arrayReduce); + } + })().then(common.mustCall()); +} +{ + // Works with an async reducer, with or without initial value + (async () => { + const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0); + assert.strictEqual(six, 6); + })().then(common.mustCall()); + (async () => { + const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c); + assert.strictEqual(six, 6); + })().then(common.mustCall()); +} +{ + // Works lazily + assert.rejects(Readable.from([1, 2, 3, 4, 5, 6]) + .map(common.mustCall((x) => { + return x; + }, 3)) // Two consumed and one buffered by `map` due to default concurrency + .reduce(async (p, c) => { + if (p === 1) { + throw new Error('boom'); + } + return c; + }, 0) + , /boom/).then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + assert.rejects(async () => { + await Readable.from([1, 2, 3]).reduce(async (p, c) => { + if (c === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(); + }, 0, { signal: ac.signal }); + }, { + name: 'AbortError', + }).then(common.mustCall()); + ac.abort(); +} + + +{ + // Support for AbortSignal - pre aborted + const stream = Readable.from([1, 2, 3]); + assert.rejects(async () => { + await stream.reduce(async (p, c) => { + if (c === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(); + }, 0, { signal: AbortSignal.abort() }); + }, { + name: 'AbortError', + }).then(common.mustCall(() => { + assert.strictEqual(stream.destroyed, true); + })); +} + +{ + // Support for AbortSignal - deep + const stream = Readable.from([1, 2, 3]); + assert.rejects(async () => { + await stream.reduce(async (p, c, { signal }) => { + signal.addEventListener('abort', common.mustCall(), { once: true }); + if (c === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(); + }, 0, { signal: AbortSignal.abort() }); + }, { + name: 'AbortError', + }).then(common.mustCall(() => { + assert.strictEqual(stream.destroyed, true); + })); +} + +{ + // Error cases + assert.rejects(() => Readable.from([]).reduce(1), /TypeError/); + assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/); +} + +{ + // Test result is a Promise + const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0); + assert.ok(result instanceof Promise); +}