From 88a48197c40901aff4d904717a1357483b370fef Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Fri, 21 Jan 2022 18:42:21 +0200 Subject: [PATCH] stream: add drop and take This adds the `drop` and `take` methods to readable streams allowing users easily drop and take items from the stream. This continues the iterator-helper proposal alignment task. Co-Authored-By: Robert Nagy PR-URL: https://github.com/nodejs/node/pull/41630 Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina --- doc/api/stream.md | 44 ++++++++++++ lib/internal/streams/operators.js | 55 +++++++++++++++ test/parallel/test-stream-drop-take.js | 96 ++++++++++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 test/parallel/test-stream-drop-take.js diff --git a/doc/api/stream.md b/doc/api/stream.md index bf8b70b3728010..729a9cc1413d11 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2021,6 +2021,50 @@ for await (const result of concatResult) { } ``` +### `readable.drop(limit[, options])` + + + +> Stability: 1 - Experimental + +* `limit` {number} the number of chunks to drop from the readable. +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream with `limit` chunks dropped. + +This method returns a new stream with the first `limit` chunks dropped. + +```mjs +import { Readable } from 'stream'; + +await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] +``` + +### `readable.take(limit[, options])` + + + +> Stability: 1 - Experimental + +* `limit` {number} the number of chunks to take from the readable. +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream with `limit` chunks taken. + +This method returns a new stream with the first `limit` chunks. + +```mjs +import { Readable } from 'stream'; + +await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 1218942b6414e4..3c679b42e7127c 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -6,6 +6,7 @@ const { Buffer } = require('buffer'); const { codes: { ERR_INVALID_ARG_TYPE, + ERR_OUT_OF_RANGE, }, AbortError, } = require('internal/errors'); @@ -15,6 +16,8 @@ const { kWeakHandler } = require('internal/event_target'); const { ArrayPrototypePush, MathFloor, + Number, + NumberIsNaN, Promise, PromiseReject, PromisePrototypeCatch, @@ -236,10 +239,62 @@ async function* flatMap(fn, options) { } } +function toIntegerOrInfinity(number) { + // We coerce here to align with the spec + // https://github.com/tc39/proposal-iterator-helpers/issues/169 + number = Number(number); + if (NumberIsNaN(number)) { + return 0; + } + if (number < 0) { + throw new ERR_OUT_OF_RANGE('number', '>= 0', number); + } + return number; +} + +function drop(number, options) { + number = toIntegerOrInfinity(number); + return async function* drop() { + if (options?.signal?.aborted) { + throw new AbortError(); + } + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (number-- <= 0) { + yield val; + } + } + }.call(this); +} + + +function take(number, options) { + number = toIntegerOrInfinity(number); + return async function* take() { + if (options?.signal?.aborted) { + throw new AbortError(); + } + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (number-- > 0) { + yield val; + } else { + return; + } + } + }.call(this); +} + module.exports.streamReturningOperators = { + drop, filter, flatMap, map, + take, }; module.exports.promiseReturningOperators = { diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js new file mode 100644 index 00000000000000..ddeb6054a78164 --- /dev/null +++ b/test/parallel/test-stream-drop-take.js @@ -0,0 +1,96 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const { deepStrictEqual, rejects, throws } = require('assert'); + +const { from } = Readable; + +const fromAsync = (...args) => from(...args).map(async (x) => x); + +const naturals = () => from(async function*() { + let i = 1; + while (true) { + yield i++; + } +}()); + +{ + // Synchronous streams + (async () => { + deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]); + deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]); + deepStrictEqual(await from([]).drop(2).toArray(), []); + deepStrictEqual(await from([]).take(1).toArray(), []); + deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]); + deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]); + deepStrictEqual(await from([1, 2]).take(0).toArray(), []); + })().then(common.mustCall()); + // Asynchronous streams + (async () => { + deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]); + deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]); + deepStrictEqual(await fromAsync([]).drop(2).toArray(), []); + deepStrictEqual(await fromAsync([]).take(1).toArray(), []); + deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]); + deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]); + deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []); + })().then(common.mustCall()); + // Infinite streams + // Asynchronous streams + (async () => { + deepStrictEqual(await naturals().take(1).toArray(), [1]); + deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]); + const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]; + deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10); + deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]); + })().then(common.mustCall()); +} + +{ + // Coercion + (async () => { + // The spec made me do this ^^ + deepStrictEqual(await naturals().take('cat').toArray(), []); + deepStrictEqual(await naturals().take('2').toArray(), [1, 2]); + deepStrictEqual(await naturals().take(true).toArray(), [1]); + })().then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + rejects( + Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), { + name: 'AbortError', + }).then(common.mustCall()); + rejects( + Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), { + name: 'AbortError', + }).then(common.mustCall()); + ac.abort(); +} + +{ + // Support for AbortSignal, already aborted + const signal = AbortSignal.abort(); + rejects( + Readable.from([1, 2, 3]).take(1, { signal }).toArray(), { + name: 'AbortError', + }).then(common.mustCall()); +} + +{ + // Error cases + const invalidArgs = [ + -1, + -Infinity, + -40, + ]; + + for (const example of invalidArgs) { + throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/); + } +}