diff --git a/doc/api/stream.md b/doc/api/stream.md index d2e72c3fba405a..ad08625fd6368c 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1751,7 +1751,7 @@ added: REPLACEME * `signal` {AbortSignal} aborted if the stream is destroyed allowing to abort the `fn` call early. * `options` {Object} - * `concurrency` {number} the maximal concurrent invocation of `fn` to call + * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -1795,7 +1795,7 @@ added: REPLACEME * `signal` {AbortSignal} aborted if the stream is destroyed allowing to abort the `fn` call early. * `options` {Object} - * `concurrency` {number} the maximal concurrent invocation of `fn` to call + * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -1844,7 +1844,7 @@ added: REPLACEME * `signal` {AbortSignal} aborted if the stream is destroyed allowing to abort the `fn` call early. * `options` {Object} - * `concurrency` {number} the maximal concurrent invocation of `fn` to call + * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -1854,11 +1854,11 @@ This method allows iterating a stream. For each item in the stream the `fn` function will be called. If the `fn` function returns a promise - that promise will be `await`ed. -This method is different from `for... await` loops in that it supports setting -the maximal concurrent invocation of `fn` through the `concurrency` option. It -is also possible to `break` from a `for... await` destroying the stream but -`forEach` is only breakable by passing it a `signal` and aborting the related -`AbortController`. +This method is different from `for await...of` loops in that it can optionally +process items concurrently. In addition, a `forEach` iteration can only be +stopped by having passed a `signal` option and aborting the related +`AbortController` while `for await...of` can be stopped with `break` or +`return`. In either case the stream will be destroyed. This method is different from listening to the [`'data'`][] event in that it uses the [`readable`][] event in the underlying machinary and can limit the diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 3df9b3f4f4fb0a..c9581f7b6dfe6c 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -23,7 +23,7 @@ const kEof = Symbol('kEof'); async function * map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], this); + 'fn', ['Function', 'AsyncFunction'], fn); } if (options != null && typeof options !== 'object') { @@ -150,7 +150,7 @@ async function * map(fn, options) { async function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], this); + 'fn', ['Function', 'AsyncFunction'], fn); } async function forEachFn(value, options) { await fn(value, options); @@ -163,7 +163,7 @@ async function forEach(fn, options) { async function * filter(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], this); + 'fn', ['Function', 'AsyncFunction'], fn); } async function filterFn(value, options) { if (await fn(value, options)) { @@ -174,8 +174,11 @@ async function * filter(fn, options) { yield* this.map(filterFn, options); } -module.exports = { +module.exports.streamReturningOperators = { filter, - forEach, map, }; + +module.exports.promiseReturningOperators = { + forEach, +}; diff --git a/lib/stream.js b/lib/stream.js index 25dd9f0ba52985..2c3261123af66e 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -31,7 +31,10 @@ const { promisify: { custom: customPromisify }, } = require('internal/util'); -const operators = require('internal/streams/operators'); +const { + streamReturningOperators, + promiseReturningOperators, +} = require('internal/streams/operators'); const compose = require('internal/streams/compose'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); @@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed; Stream.isErrored = utils.isErrored; Stream.isReadable = utils.isReadable; Stream.Readable = require('internal/streams/readable'); -for (const key of ObjectKeys(operators)) { - const op = operators[key]; +for (const key of ObjectKeys(streamReturningOperators)) { + const op = streamReturningOperators[key]; Stream.Readable.prototype[key] = function(...args) { return Stream.Readable.from(ReflectApply(op, this, args)); }; } +for (const key of ObjectKeys(promiseReturningOperators)) { + const op = promiseReturningOperators[key]; + Stream.Readable.prototype[key] = function(...args) { + return ReflectApply(op, this, args); + }; +} Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 16040ad66c0d73..82013554cd2aa8 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -68,15 +68,15 @@ const { setTimeout } = require('timers/promises'); { // Error cases assert.rejects(async () => { - Readable.from([1]).forEach(1); + await Readable.from([1]).forEach(1); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); assert.rejects(async () => { - Readable.from([1]).forEach((x) => x, { + await Readable.from([1]).forEach((x) => x, { concurrency: 'Foo' }); }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); assert.rejects(async () => { - Readable.from([1]).forEach((x) => x, 1); + await Readable.from([1]).forEach((x) => x, 1); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); } {