From f159202dc4b09f37377b891e50233c9daa67b42f Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 17 Jan 2022 20:21:51 +0200 Subject: [PATCH] stream: support some and every This continues on the iterator-helpers work by adding `.some` and `.every` to readable streams. Co-Authored-By: Robert Nagy --- doc/api/stream.md | 98 +++++++++++++++++++++++- lib/internal/streams/operators.js | 37 +++++++++ test/parallel/test-stream-some-every.js | 99 +++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-some-every.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 6f0cb937d5fb7b..ead69a0b86bacf 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1918,7 +1918,7 @@ import { Resolver } from 'dns/promises'; await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4] // Make dns queries concurrently using .map and collect -// the results into an aray using toArray +// the results into an array using toArray const dnsResults = await Readable.from([ 'nodejs.org', 'openjsf.org', @@ -1929,6 +1929,102 @@ const dnsResults = await Readable.from([ }, { concurrency: 2 }).toArray(); ``` +### `readable.some(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy + value for some of the chunks. + +This method is similar to `Array.prototype.some` and calls `fn` on each chunk +in the stream until one item returns true (or any truthy value). Once an `fn` +call on a chunk returns a truthy value the stream is destroyed and the promise +is fulfilled with `true`. If none of the `fn` calls on the chunks return a +truthy value the promise is fulfilled with `false`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true +await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false + +// With an asynchronous predicate, making at most 2 file checks at a time. +const anyBigFile = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).some(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB +console.log('done'); // Stream has finished +``` + +### `readable.every(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy + value for all of the chunks. + +This method is similar to `Array.prototype.every` and calls `fn` on each chunk +in the stream to check if they all return a truthy value for `fn`. Once an `fn` +call on a chunk returns a falsy value the stream is destroyed and the promise +is fulfilled with `false`. If all of the `fn` calls on the chunks return a +truthy value the promise is fulfilled with `true`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false +await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true + +// With an asynchronous predicate, making at most 2 file checks at a time. +const allBigFiles = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).every(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +console.log(anyBigFile); // `true` if all files in the list are bigger than 1MiB +console.log('done'); // Stream has finished +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 2649966fd403ac..5a4475fdd382f2 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -10,6 +10,7 @@ const { AbortError, } = require('internal/errors'); const { validateInteger } = require('internal/validators'); +const { kWeakHandler } = require('internal/event_target'); const { ArrayPrototypePush, @@ -150,6 +151,40 @@ async function * map(fn, options) { } } +async function some(fn, options) { + // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some + // Note that some does short circuit but also closes the iterator if it does + const ac = new AbortController(); + if (options?.signal) { + if (options.signal.aborted) { + ac.abort(); + } + options.signal.addEventListener('abort', () => ac.abort(), { + [kWeakHandler]: this, + once: true, + }); + } + const mapped = this.map(fn, { ...options, signal: ac.signal }); + for await (const result of mapped) { + if (result) { + ac.abort(); + return true; + } + } + return false; +} + +async function every(fn, options) { + if (typeof fn !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], fn); + } + // https://en.wikipedia.org/wiki/De_Morgan%27s_laws + return !(await some.call(this, async (x) => { + return !(await fn(x)); + }, options)); +} + async function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( @@ -196,6 +231,8 @@ module.exports.streamReturningOperators = { }; module.exports.promiseReturningOperators = { + every, forEach, toArray, + some, }; diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js new file mode 100644 index 00000000000000..7ec649e3b34306 --- /dev/null +++ b/test/parallel/test-stream-some-every.js @@ -0,0 +1,99 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); + +function oneTo5() { + return Readable.from([1, 2, 3, 4, 5]); +} + +function oneTo5Async() { + return oneTo5().map(async (x) => { + await Promise.resolve(); + return x; + }); +} +{ + // Some and every work with a synchronous stream and predicate + (async () => { + assert.strictEqual(await oneTo5().some((x) => x > 3), true); + assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().some((x) => x > 6), false); + assert.strictEqual(await oneTo5().every((x) => x < 6), true); + assert.strictEqual(await Readable.from([]).some((x) => true), false); + assert.strictEqual(await Readable.from([]).every((x) => true), true); + })().then(common.mustCall()); +} + +{ + // Some and every work with an asynchronous stream and synchronous predicate + (async () => { + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + })().then(common.mustCall()); +} + +{ + // Some and every work on asynchronous streams with an asynchronous predicate + (async () => { + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + })().then(common.mustCall()); +} + +{ + // Some and every short circuit + (async () => { + await oneTo5().some(common.mustCall((x) => x > 2, 3)); + await oneTo5().every(common.mustCall((x) => x < 3, 3)); + // When short circuit isn't possible the whole stream is iterated + await oneTo5().some(common.mustCall((x) => x > 6, 5)); + // The stream is destroyed afterwards + const stream = oneTo5(); + await stream.some(common.mustCall((x) => x > 2, 3)); + assert.strictEqual(stream.destroyed, true); + })().then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + assert.rejects(async () => { + await Readable.from([1, 2, 3]).some( + async (x) => new Promise(() => {}), + { signal: ac.signal }); + }, { + name: 'AbortError', + }); + ac.abort(); +} +{ + // Support for pre-aborted AbortSignal + const ac = new AbortController(); + ac.abort(); + assert.rejects(async () => { + await Readable.from([1, 2, 3]).some( + async (x) => new Promise(() => {}), + { signal: ac.signal }); + }, { + name: 'AbortError', + }); +} +{ + // Error cases + assert.rejects(async () => { + await Readable.from([1]).every(1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1]).every((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); +}