Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add reduce #41669

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2142,6 +2142,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
```

### `readable.reduce(fn[, initial[, options]])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
in the stream.
* `previous` {any} the value obtained from the last call to `fn` or the
`initial` value if specified or the first chunk of the stream otherwise.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `initial` {any} the initial value to use in the reduction.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise for the final value of the reduction.

This method calls `fn` on each chunk of the stream in order, passing it the
result from the calculation on the previous element. It returns a promise for
the final value of the reduction.
benjamingr marked this conversation as resolved.
Show resolved Hide resolved

The reducer function iterates the stream element-by-element which means that
there is no `concurrency` parameter or parallism. To perform a `reduce`
concurrently, it can be chained to the [`readable.map`][] method.

If no `initial` value is supplied the first chunk of the stream is used as the
initial value. If the stream is empty, the promise is rejected with a
`TypeError` with the `ERR_INVALID_ARGS` code property.

```mjs
import { Readable } from 'stream';

const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
return previous + data;
});
console.log(ten); // 10
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down Expand Up @@ -4217,6 +4260,7 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
Expand Down
15 changes: 15 additions & 0 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -17,6 +17,8 @@ const {
validateObject,
} = require('internal/validators');

const { Promise } = primordials;

const {
isClosed,
isReadable,
Expand Down Expand Up @@ -236,4 +238,17 @@ function eos(stream, options, callback) {
return cleanup;
}

function finished(stream, opts) {
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = eos;
module.exports.finished = finished;
59 changes: 56 additions & 3 deletions lib/internal/streams/operators.js
Expand Up @@ -5,12 +5,14 @@ const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');

const {
ArrayPrototypePush,
Expand Down Expand Up @@ -196,8 +198,8 @@ async function every(fn, options) {
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
return !(await some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}

Expand Down Expand Up @@ -228,11 +230,61 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

// Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
constructor() {
super('reduce');
this.message = 'Reduce of an empty stream requires an initial value';
}
}

async function reduce(reducer, initialValue, options) {
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once('error', () => {}); // The error is already propagated
await finished(this.destroy(err));
throw err;
}
const ac = new AbortController();
const signal = ac.signal;
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
if (options?.signal) {
const opts = { once: true, [kWeakHandler]: this };
options.signal.addEventListener('abort', () => ac.abort(), opts);
}
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
}
}
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
Mesteery marked this conversation as resolved.
Show resolved Hide resolved
}
} finally {
ac.abort();
}
return initialValue;
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
Expand Down Expand Up @@ -307,6 +359,7 @@ module.exports.streamReturningOperators = {
module.exports.promiseReturningOperators = {
every,
forEach,
reduce,
toArray,
some,
};
14 changes: 1 addition & 13 deletions lib/stream/promises.js
Expand Up @@ -11,7 +11,7 @@ const {
} = require('internal/streams/utils');

const { pipelineImpl: pl } = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const { finished } = require('internal/streams/end-of-stream');

function pipeline(...streams) {
return new Promise((resolve, reject) => {
Expand All @@ -35,18 +35,6 @@ function pipeline(...streams) {
});
}

function finished(stream, opts) {
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = {
finished,
pipeline,
Expand Down
130 changes: 130 additions & 0 deletions test/parallel/test-stream-reduce.js
@@ -0,0 +1,130 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function sum(p, c) {
return p + c;
}

{
// Does the same thing as `(await stream.toArray()).reduce(...)`
(async () => {
const tests = [
[[], sum, 0],
[[1], sum, 0],
[[1, 2, 3, 4, 5], sum, 0],
[[...Array(100).keys()], sum, 0],
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
[['a', 'b', 'c'], sum, ''],
[[1, 2], sum],
[[1, 2, 3], (x, y) => y],
];
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
// asynchronous reducer
for (const [values, fn, initial] of tests) {
const streamReduce = await Readable.from(values)
.map(async (x) => x)
.reduce(fn, initial);
const arrayReduce = values.reduce(fn, initial);
assert.deepStrictEqual(streamReduce, arrayReduce);
}
})().then(common.mustCall());
}
{
// Works with an async reducer, with or without initial value
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
assert.strictEqual(six, 6);
})().then(common.mustCall());
(async () => {
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
assert.strictEqual(six, 6);
})().then(common.mustCall());
}
{
// Works lazily
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
.map(common.mustCall((x) => {
return x;
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
.reduce(async (p, c) => {
if (p === 1) {
throw new Error('boom');
}
return c;
}, 0)
, /boom/).then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(async () => {
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}


{
// Support for AbortSignal - pre aborted
const stream = Readable.from([1, 2, 3]);
assert.rejects(async () => {
await stream.reduce(async (p, c) => {
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: AbortSignal.abort() });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
assert.strictEqual(stream.destroyed, true);
}));
}

{
// Support for AbortSignal - deep
const stream = Readable.from([1, 2, 3]);
assert.rejects(async () => {
await stream.reduce(async (p, c, { signal }) => {
signal.addEventListener('abort', common.mustCall(), { once: true });
if (c === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve();
}, 0, { signal: AbortSignal.abort() });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
assert.strictEqual(stream.destroyed, true);
}));
}

{
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
}

{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
assert.ok(result instanceof Promise);
}