diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js index 4eeb877980b971..e434eb8c84ef9b 100644 --- a/test/parallel/test-stream-filter.js +++ b/test/parallel/test-stream-filter.js @@ -5,6 +5,7 @@ const { Readable, } = require('stream'); const assert = require('assert'); +const { once } = require('events'); const { setTimeout } = require('timers/promises'); { @@ -46,13 +47,80 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } +{ + // Filter works on an infinite stream + const stream = Readable.from(async function* () { + while (true) yield 1; + }()).filter(common.mustCall(async (x) => { + return x < 3; + }, 5)); + (async () => { + let i = 1; + for await (const item of stream) { + assert.strictEqual(item, 1); + if (++i === 5) break; + } + })().then(common.mustCall()); +} + +{ + // Filter works on constructor created streams + let i = 0; + const stream = new Readable({ + read() { + if (i === 10) { + this.push(null); + return; + } + this.push(Uint8Array.from([i])); + i++; + }, + highWaterMark: 0, + }).filter(common.mustCall(async ([x]) => { + return x !== 5; + }, 10)); + (async () => { + const result = (await stream.toArray()).map((x) => x[0]); + const expected = [...Array(10).keys()].filter((x) => x !== 5); + assert.deepStrictEqual(result, expected); + })().then(common.mustCall()); +} + +{ + // Throwing an error during `filter` (sync) + const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => { + if (x === 3) { + throw new Error('boom'); + } + return true; + }); + assert.rejects( + stream.map((x) => x + x).toArray(), + /boom/, + ).then(common.mustCall()); +} + +{ + // Throwing an error during `filter` (async) + const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { + if (x === 3) { + throw new Error('boom'); + } + return true; + }); + assert.rejects( + stream.filter(() => true).toArray(), + /boom/, + ).then(common.mustCall()); +} + { // Concurrency + AbortSignal const ac = new AbortController(); let calls = 0; const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => { calls++; - await setTimeout(100, { signal }); + await once(signal, 'abort'); }, { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => {