Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: iterator helper drop take #41630

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2074,6 +2074,50 @@ for await (const result of concatResult) {
}
```

### `readable.drop(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `limit` {number} the number of chunks to drop from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks dropped.

This method returns a new stream with the first `limit` chunks dropped.

```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
```

### `readable.take(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `limit` {number} the number of chunks to take from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks dropped.

This method returns a new stream with the first `limit` chunks dropped.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This method returns a new stream with the first `limit` chunks dropped.
This method returns a new stream with the last `limit` chunks dropped.

Refs: #41669 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix when landing to avoid waiting for full CI on a docs change.


```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
55 changes: 55 additions & 0 deletions lib/internal/streams/operators.js
Expand Up @@ -5,6 +5,7 @@ const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
Expand All @@ -14,6 +15,8 @@ const { kWeakHandler } = require('internal/event_target');
const {
ArrayPrototypePush,
MathFloor,
Number,
NumberIsNaN,
Promise,
PromiseReject,
PromisePrototypeCatch,
Expand Down Expand Up @@ -232,10 +235,62 @@ async function* flatMap(fn, options) {
}
}

function toIntegerOrInfinity(number) {
// We coerce here to align with the spec
// https://github.com/tc39/proposal-iterator-helpers/issues/169
number = Number(number);
if (NumberIsNaN(number)) {
return 0;
}
if (number < 0) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
}
return number;
}

function drop(number, options) {
number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- <= 0) {
yield val;
}
}
}.call(this);
}


function take(number, options) {
number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
} else {
return;
}
}
}.call(this);
}

module.exports.streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
};

module.exports.promiseReturningOperators = {
Expand Down
96 changes: 96 additions & 0 deletions test/parallel/test-stream-drop-take.js
@@ -0,0 +1,96 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects, throws } = require('assert');

const { from } = Readable;

const fromAsync = (...args) => from(...args).map(async (x) => x);

const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
}
}());

{
// Synchronous streams
(async () => {
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await from([]).drop(2).toArray(), []);
deepStrictEqual(await from([]).take(1).toArray(), []);
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Asynchronous streams
(async () => {
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Infinite streams
// Asynchronous streams
(async () => {
deepStrictEqual(await naturals().take(1).toArray(), [1]);
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
})().then(common.mustCall());
}

{
// Coercion
(async () => {
// The spec made me do this ^^
deepStrictEqual(await naturals().take('cat').toArray(), []);
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
deepStrictEqual(await naturals().take(true).toArray(), [1]);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
rejects(
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
rejects(
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}

{
// Support for AbortSignal, already aborted
const signal = AbortSignal.abort();
rejects(
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
const invalidArgs = [
-1,
-Infinity,
-40,
];

for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}
}