Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add forEach method #41445

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 61 additions & 2 deletions doc/api/stream.md
Expand Up @@ -1751,7 +1751,7 @@ added: REPLACEME
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1795,7 +1795,7 @@ added: REPLACEME
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1830,6 +1830,65 @@ for await (const result of dnsResults) {
}
```

### `readable.forEach(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise for when the stream has finished.

This method allows iterating a stream. For each item in the stream the
`fn` function will be called. If the `fn` function returns a promise - that
promise will be `await`ed.
benjamingr marked this conversation as resolved.
Show resolved Hide resolved

This method is different from `for await...of` loops in that it can optionally
process items concurrently. In addition, a `forEach` iteration can only be
stopped by having passed a `signal` option and aborting the related
`AbortController` while `for await...of` can be stopped with `break` or
`return`. In either case the stream will be destroyed.

This method is different from listening to the [`'data'`][] event in that it
uses the [`readable`][] event in the underlying machinary and can limit the
number of concurrent `fn` calls.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
28 changes: 23 additions & 5 deletions lib/internal/streams/operators.js
Expand Up @@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
async function * map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
'fn', ['Function', 'AsyncFunction'], fn);
}

if (options != null && typeof options !== 'object') {
Expand Down Expand Up @@ -147,10 +147,23 @@ async function * map(fn, options) {
}
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
async function forEachFn(value, options) {
await fn(value, options);
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of this.map(forEachFn, options));
}

async function * filter(fn, options) {
if (typeof fn !== 'function') {
throw (new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this));
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
async function filterFn(value, options) {
if (await fn(value, options)) {
Expand All @@ -160,7 +173,12 @@ async function * filter(fn, options) {
}
yield* this.map(filterFn, options);
}
module.exports = {

module.exports.streamReturningOperators = {
filter,
map,
filter
};

module.exports.promiseReturningOperators = {
forEach,
};
15 changes: 12 additions & 3 deletions lib/stream.js
Expand Up @@ -31,7 +31,10 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const operators = require('internal/streams/operators');
const {
streamReturningOperators,
promiseReturningOperators,
} = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.isReadable = utils.isReadable;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
const op = operators[key];
for (const key of ObjectKeys(streamReturningOperators)) {
const op = streamReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
for (const key of ObjectKeys(promiseReturningOperators)) {
const op = promiseReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return ReflectApply(op, this, args);
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
86 changes: 86 additions & 0 deletions test/parallel/test-stream-forEach.js
@@ -0,0 +1,86 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// forEach works on synchronous streams with a synchronous predicate
const stream = Readable.from([1, 2, 3]);
const result = [1, 2, 3];
(async () => {
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
})().then(common.mustCall());
}

{
// forEach works an asynchronous streams
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
await Promise.resolve();
return true;
});
const result = [1, 2, 3];
(async () => {
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
})().then(common.mustCall());
}

{
// forEach works on asynchronous streams with a asynchronous forEach fn
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
await Promise.resolve();
return true;
});
const result = [1, 2, 3];
(async () => {
await stream.forEach(async (value) => {
await Promise.resolve();
assert.strictEqual(value, result.shift());
});
})().then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const forEachPromise =
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
await forEachPromise;
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).forEach(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Promise
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
assert.strictEqual(typeof stream.then, 'function');
}