Skip to content

Commit

Permalink
core review + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingr committed Jan 9, 2022
1 parent f14286f commit 013df4b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
16 changes: 8 additions & 8 deletions doc/api/stream.md
Expand Up @@ -1751,7 +1751,7 @@ added: REPLACEME
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1795,7 +1795,7 @@ added: REPLACEME
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1844,7 +1844,7 @@ added: REPLACEME
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand All @@ -1854,11 +1854,11 @@ This method allows iterating a stream. For each item in the stream the
`fn` function will be called. If the `fn` function returns a promise - that
promise will be `await`ed.

This method is different from `for... await` loops in that it supports setting
the maximal concurrent invocation of `fn` through the `concurrency` option. It
is also possible to `break` from a `for... await` destroying the stream but
`forEach` is only breakable by passing it a `signal` and aborting the related
`AbortController`.
This method is different from `for await...of` loops in that it can optionally
process items concurrently. In addition, a `forEach` iteration can only be
stopped by having passed a `signal` option and aborting the related
`AbortController` while `for await...of` can be stopped with `break` or
`return`. In either case the stream will be destroyed.

This method is different from listening to the [`'data'`][] event in that it
uses the [`readable`][] event in the underlying machinary and can limit the
Expand Down
13 changes: 8 additions & 5 deletions lib/internal/streams/operators.js
Expand Up @@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
async function * map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
'fn', ['Function', 'AsyncFunction'], fn);
}

if (options != null && typeof options !== 'object') {
Expand Down Expand Up @@ -150,7 +150,7 @@ async function * map(fn, options) {
async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
'fn', ['Function', 'AsyncFunction'], fn);
}
async function forEachFn(value, options) {
await fn(value, options);
Expand All @@ -163,7 +163,7 @@ async function forEach(fn, options) {
async function * filter(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
'fn', ['Function', 'AsyncFunction'], fn);
}
async function filterFn(value, options) {
if (await fn(value, options)) {
Expand All @@ -174,8 +174,11 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

module.exports = {
module.exports.streamReturningOperators = {
filter,
forEach,
map,
};

module.exports.promiseReturningOperators = {
forEach,
};
15 changes: 12 additions & 3 deletions lib/stream.js
Expand Up @@ -31,7 +31,10 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const operators = require('internal/streams/operators');
const {
streamReturningOperators,
promiseReturningOperators,
} = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.isReadable = utils.isReadable;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
const op = operators[key];
for (const key of ObjectKeys(streamReturningOperators)) {
const op = streamReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
for (const key of ObjectKeys(promiseReturningOperators)) {
const op = promiseReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return ReflectApply(op, this, args);
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-stream-forEach.js
Expand Up @@ -68,15 +68,15 @@ const { setTimeout } = require('timers/promises');
{
// Error cases
assert.rejects(async () => {
Readable.from([1]).forEach(1);
await Readable.from([1]).forEach(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
Readable.from([1]).forEach((x) => x, {
await Readable.from([1]).forEach((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
Readable.from([1]).forEach((x) => x, 1);
await Readable.from([1]).forEach((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
Expand Down

0 comments on commit 013df4b

Please sign in to comment.