diff --git a/doc/api/stream.md b/doc/api/stream.md index fccd138f15c57d..bf8b70b3728010 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1972,6 +1972,55 @@ console.log(allBigFiles); console.log('done'); // Stream has finished ``` +### `readable.flatMap(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over + every item in the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream flat-mapped with the function `fn`. + +This method returns a new stream by applying the given callback to each +chunk of the stream and then flattening the result. + +It is possible to return a stream or another iterable or async iterable from +`fn` and the result streams will be merged (flattened) into the returned +stream. + +```mjs +import { Readable } from 'stream'; +import { createReadStream } from 'fs'; + +// With a synchronous mapper. +for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) { + console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4 +} +// With an asynchronous mapper, combine the contents of 4 files +const concatResult = Readable.from([ + './1.mjs', + './2.mjs', + './3.mjs', + './4.mjs', +]).flatMap((fileName) => createReadStream(fileName)); +for await (const result of concatResult) { + // This will contain the contents (all chunks) of all 4 files + console.log(result); +} +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 9c50865f3da3be..1218942b6414e4 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -229,8 +229,16 @@ async function toArray(options) { } return result; } + +async function* flatMap(fn, options) { + for await (const val of this.map(fn, options)) { + yield* val; + } +} + module.exports.streamReturningOperators = { filter, + flatMap, map, }; diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js new file mode 100644 index 00000000000000..1ace8a0cf513bc --- /dev/null +++ b/test/parallel/test-stream-flatMap.js @@ -0,0 +1,131 @@ +'use strict'; + +const common = require('../common'); +const fixtures = require('../common/fixtures'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); +const { createReadStream } = require('fs'); + +function oneTo5() { + return Readable.from([1, 2, 3, 4, 5]); +} + +{ + // flatMap works on synchronous streams with a synchronous mapper + (async () => { + assert.deepStrictEqual( + await oneTo5().flatMap((x) => [x + x]).toArray(), + [2, 4, 6, 8, 10] + ); + assert.deepStrictEqual( + await oneTo5().flatMap(() => []).toArray(), + [] + ); + assert.deepStrictEqual( + await oneTo5().flatMap((x) => [x, x]).toArray(), + [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] + ); + })().then(common.mustCall()); +} + + +{ + // flatMap works on sync/async streams with an asynchronous mapper + (async () => { + assert.deepStrictEqual( + await oneTo5().flatMap(async (x) => [x, x]).toArray(), + [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] + ); + const asyncOneTo5 = oneTo5().map(async (x) => x); + assert.deepStrictEqual( + await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(), + [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] + ); + })().then(common.mustCall()); +} +{ + // flatMap works on a stream where mapping returns a stream + (async () => { + const result = await oneTo5().flatMap(async (x) => { + return Readable.from([x, x]); + }).toArray(); + assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]); + })().then(common.mustCall()); + // flatMap works on an objectMode stream where mappign returns a stream + (async () => { + const result = await oneTo5().flatMap(() => { + return createReadStream(fixtures.path('x.txt')); + }).toArray(); + // The resultant stream is in object mode so toArray shouldn't flatten + assert.strictEqual(result.length, 5); + assert.deepStrictEqual( + Buffer.concat(result).toString(), + 'xyz\n'.repeat(5) + ); + + })().then(common.mustCall()); + +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { + await setTimeout(100, { signal }); + }), { signal: ac.signal, concurrency: 2 }); + // pump + assert.rejects(async () => { + for await (const item of stream) { + // nope + console.log(item); + } + }, { + name: 'AbortError', + }).then(common.mustCall()); + + queueMicrotask(() => { + ac.abort(); + }); +} + +{ + // Already aborted AbortSignal + const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { + await setTimeout(100, { signal }); + }), { signal: AbortSignal.abort() }); + // pump + assert.rejects(async () => { + for await (const item of stream) { + // nope + console.log(item); + } + }, { + name: 'AbortError', + }).then(common.mustCall()); +} + +{ + // Error cases + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).flatMap(1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).flatMap((x) => x, { + concurrency: 'Foo' + })); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).flatMap((x) => x, 1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} +{ + // Test result is a Readable + const stream = oneTo5().flatMap((x) => x); + assert.strictEqual(stream.readable, true); +}