diff --git a/doc/api/stream.md b/doc/api/stream.md index 4398b9f71e416c..9c2175fb181261 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2064,6 +2064,30 @@ import { Readable } from 'stream'; await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] ``` +### `readable.asIndexedPairs([options])` + + + +> Stability: 1 - Experimental + +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream of indexed pairs. + +This method returns a new stream with chunks of the underlying stream paired +with a counter in the form `[index, chunk]`. The first index value is 0 and it +increases by 1 for each chunk produced. + +```mjs +import { Readable } from 'stream'; + +const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray(); +console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']] +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index a1190fabcf8c00..1bc81204c58b28 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -157,6 +157,16 @@ async function * map(fn, options) { } } +async function* asIndexedPairs(options) { + let index = 0; + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError({ cause: options.signal.reason }); + } + yield [index++, val]; + } +} + async function some(fn, options) { // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some // Note that some does short circuit but also closes the iterator if it does @@ -286,6 +296,7 @@ function take(number, options) { } module.exports.streamReturningOperators = { + asIndexedPairs, drop, filter, flatMap, diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs new file mode 100644 index 00000000000000..382ec7a8af04b6 --- /dev/null +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -0,0 +1,47 @@ +import '../common/index.mjs'; +import { Readable } from 'stream'; +import { deepStrictEqual, rejects } from 'assert'; + +{ + // asIndexedPairs with a synchronous stream + const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await Readable.from([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); +} + +{ + // asIndexedPairs works an asynchronous streams + const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x); + const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await asyncFrom([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); +} + +{ + // Does not enumerate an infinite stream + const infinite = () => Readable.from(async function* () { + while (true) yield 1; + }()); + const pairs = await infinite().asIndexedPairs().take(3).toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]); + const empty = await infinite().asIndexedPairs().take(0).toArray(); + deepStrictEqual(empty, []); +} + +{ + // AbortSignal + await rejects(async () => { + const ac = new AbortController(); + const { signal } = ac; + const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + ac.abort(); + await p; + }, { name: 'AbortError' }); + + await rejects(async () => { + const signal = AbortSignal.abort(); + await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + }, /AbortError/); +}