Skip to content

Commit

Permalink
stream: add reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingr committed Jan 27, 2022
1 parent 7752eed commit 9976118
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 3 deletions.
44 changes: 44 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2142,6 +2142,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
```

### `readable.reduce(fn[, initial[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
in the stream.
* `previous` {any} the value obtained from the last call to `fn` or the
`initial` value if specified or the first chunk of the stream otherwise.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `initial` {any} the initial value to use in the reduction.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise for the final value of the reduction.

This method calls `fn` on each chunk of the stream in order, passing it the
result from the calculation on the previous element. It returns a promise for
the final value of the reduction.

The reducer function iterates the stream element-by-element which means that
there is no `concurrency` parameter or parallism. To perform a `reduce`
concurrently, it can be chained to the [`readable.map`][] method.

If no `initial` value is supplied the first chunk of the stream is used as the
initial value. If the stream is empty, the promise is rejected with a
`TypeError` with the `ERR_INVALID_ARGS` code property.

```mjs
import { Readable } from 'stream';

const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
return previous + data;
});
console.log(ten); // 10
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down Expand Up @@ -4217,6 +4260,7 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
Expand Down
54 changes: 51 additions & 3 deletions lib/internal/streams/operators.js
Expand Up @@ -6,6 +6,7 @@ const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_MISSING_ARGS,
},
AbortError,
} = require('internal/errors');
Expand Down Expand Up @@ -196,8 +197,8 @@ async function every(fn, options) {
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
return !(await some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}

Expand Down Expand Up @@ -228,11 +229,57 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

// Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
constructor() {
super('reduce');
this.message = 'Reduce of an empty stream requires an initial value';
}
}

async function reduce(reducer, initialValue, options) {
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once('error', () => {}); // The error is already propagated
this.destroy(err);
throw err;
}
const ac = new AbortController();
const signal = ac.signal;
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
}
}
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
}
} finally {
ac.abort();
}
return initialValue;
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
Expand Down Expand Up @@ -307,6 +354,7 @@ module.exports.streamReturningOperators = {
module.exports.promiseReturningOperators = {
every,
forEach,
reduce,
toArray,
some,
};
112 changes: 112 additions & 0 deletions test/parallel/test-stream-reduce.js
@@ -0,0 +1,112 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function sum(p, c) {
return p + c;
}

{
// Does the same thing as `(await stream.toArray()).reduce(...)`
(async () => {
const tests = [
[[], sum, 0],
[[1], sum, 0],
[[1, 2, 3, 4, 5], sum, 0],
[[...Array(100).keys()], sum, 0],
[['a', 'b', 'c'], sum, ''],
[[1, 2], sum],
[[1, 2, 3], (x, y) => y],
];
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
// asynchronous reducer
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.map(async (x) => x)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
})().then(common.mustCall());
}
{
// Works with an async reducer, with or without initial value
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
assert.strictEqual(six, 6);
})().then(common.mustCall());
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
assert.strictEqual(six, 6);
})().then(common.mustCall());
}
{
// Works lazily
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
.map(common.mustCall((x) => {
return x;
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
.reduce(async (p, c) => {
if (p === 1) {
throw new Error('boom');
}
return c;
}, 0)
, /boom/).then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(async () => {
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}


{
// Support for AbortSignal - pre aborted
const stream = Readable.from([1, 2, 3]);
assert.rejects(async () => {
await stream.reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: AbortSignal.abort() });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
assert.strictEqual(stream.destroyed, true);
}));
}

{
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
}

{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
assert.ok(result instanceof Promise);
}

0 comments on commit 9976118

Please sign in to comment.