Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
stream: add reduce
PR-URL: #41669
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
benjamingr authored and danielleadams committed Apr 21, 2022
1 parent 4b63439 commit f12cf6d
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 16 deletions.
44 changes: 44 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2089,6 +2089,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
```

### `readable.reduce(fn[, initial[, options]])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
in the stream.
* `previous` {any} the value obtained from the last call to `fn` or the
`initial` value if specified or the first chunk of the stream otherwise.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `initial` {any} the initial value to use in the reduction.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise for the final value of the reduction.

This method calls `fn` on each chunk of the stream in order, passing it the
result from the calculation on the previous element. It returns a promise for
the final value of the reduction.

The reducer function iterates the stream element-by-element which means that
there is no `concurrency` parameter or parallism. To perform a `reduce`
concurrently, it can be chained to the [`readable.map`][] method.

If no `initial` value is supplied the first chunk of the stream is used as the
initial value. If the stream is empty, the promise is rejected with a
`TypeError` with the `ERR_INVALID_ARGS` code property.

```mjs
import { Readable } from 'stream';

const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
return previous + data;
});
console.log(ten); // 10
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down Expand Up @@ -4137,6 +4180,7 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
Expand Down
15 changes: 15 additions & 0 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -17,6 +17,8 @@ const {
validateObject,
} = require('internal/validators');

const { Promise } = primordials;

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
Expand Down Expand Up @@ -232,4 +234,17 @@ function eos(stream, options, callback) {
return cleanup;
}

function finished(stream, opts) {
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = eos;
module.exports.finished = finished;
59 changes: 56 additions & 3 deletions lib/internal/streams/operators.js
Expand Up @@ -6,12 +6,14 @@ const { Buffer } = require('buffer');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');

const {
ArrayPrototypePush,
Expand Down Expand Up @@ -199,8 +201,8 @@ async function every(fn, options) {
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
return !(await some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}

Expand Down Expand Up @@ -231,11 +233,61 @@ function filter(fn, options) {
return this.map(filterFn, options);
}

// Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
constructor() {
super('reduce');
this.message = 'Reduce of an empty stream requires an initial value';
}
}

async function reduce(reducer, initialValue, options) {
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once('error', () => {}); // The error is already propagated
await finished(this.destroy(err));
throw err;
}
const ac = new AbortController();
const signal = ac.signal;
if (options?.signal) {
const opts = { once: true, [kWeakHandler]: this };
options.signal.addEventListener('abort', () => ac.abort(), opts);
}
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
}
}
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
}
} finally {
ac.abort();
}
return initialValue;
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
Expand Down Expand Up @@ -316,6 +368,7 @@ module.exports.streamReturningOperators = {
module.exports.promiseReturningOperators = {
every,
forEach,
reduce,
toArray,
some,
};
14 changes: 1 addition & 13 deletions lib/stream/promises.js
Expand Up @@ -11,7 +11,7 @@ const {
} = require('internal/streams/utils');

const { pipelineImpl: pl } = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const { finished } = require('internal/streams/end-of-stream');

function pipeline(...streams) {
return new Promise((resolve, reject) => {
Expand All @@ -35,18 +35,6 @@ function pipeline(...streams) {
});
}

function finished(stream, opts) {
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = {
finished,
pipeline,
Expand Down
130 changes: 130 additions & 0 deletions test/parallel/test-stream-reduce.js
@@ -0,0 +1,130 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function sum(p, c) {
return p + c;
}

{
// Does the same thing as `(await stream.toArray()).reduce(...)`
(async () => {
const tests = [
[[], sum, 0],
[[1], sum, 0],
[[1, 2, 3, 4, 5], sum, 0],
[[...Array(100).keys()], sum, 0],
[['a', 'b', 'c'], sum, ''],
[[1, 2], sum],
[[1, 2, 3], (x, y) => y],
];
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
// asynchronous reducer
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.map(async (x) => x)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
})().then(common.mustCall());
}
{
// Works with an async reducer, with or without initial value
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
assert.strictEqual(six, 6);
})().then(common.mustCall());
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
assert.strictEqual(six, 6);
})().then(common.mustCall());
}
{
// Works lazily
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
.map(common.mustCall((x) => {
return x;
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
.reduce(async (p, c) => {
if (p === 1) {
throw new Error('boom');
}
return c;
}, 0)
, /boom/).then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(async () => {
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}


{
// Support for AbortSignal - pre aborted
const stream = Readable.from([1, 2, 3]);
assert.rejects(async () => {
await stream.reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: AbortSignal.abort() });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
assert.strictEqual(stream.destroyed, true);
}));
}

{
// Support for AbortSignal - deep
const stream = Readable.from([1, 2, 3]);
assert.rejects(async () => {
await stream.reduce(async (p, c, { signal }) => {
signal.addEventListener('abort', common.mustCall(), { once: true });
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: AbortSignal.abort() });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
assert.strictEqual(stream.destroyed, true);
}));
}

{
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
}

{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
assert.ok(result instanceof Promise);
}

0 comments on commit f12cf6d

Please sign in to comment.