From 4965229d85ebc13dcfd51a48c25362027b9f0749 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Fri, 11 Feb 2022 20:21:42 +0200 Subject: [PATCH 1/3] stream: add more forEach tests --- test/parallel/test-stream-forEach.js | 45 ++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index e26310e3bccb85..f0d8a330363932 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -43,6 +43,51 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } +{ + // forEach works on an infinite stream + const ac = new AbortController(); + const { signal } = ac; + const stream = Readable.from(async function* () { + while (true) yield 1; + }(), { signal }); + let i = 0; + assert.rejects(stream.forEach(common.mustCall((x) => { + i++; + if (i === 10) ac.abort(); + assert.strictEqual(x, 1); + }, 10)), { name: 'AbortError' }); +} + +{ + // Emitting an error during `forEach` + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach(async (x) => { + if (x === 3) { + stream.emit('error', new Error('boom')); + } + }), /boom/).then(common.mustCall()); +} + +{ + // Throwing an error during `forEach` (sync) + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach((x) => { + if (x === 3) { + throw new Error('boom'); + } + }), /boom/).then(common.mustCall()); +} + +{ + // Throwing an error during `forEach` (async) + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach(async (x) => { + if (x === 3) { + return Promise.reject(new Error('boom')); + } + }), /boom/).then(common.mustCall()); +} + { // Concurrency + AbortSignal const ac = new AbortController(); From 188068b940b02a634bf3ea473fbd2c2fa00c1df3 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sat, 12 Feb 2022 19:53:09 +0200 Subject: [PATCH 2/3] Update test/parallel/test-stream-forEach.js Co-authored-by: Antoine du Hamel --- test/parallel/test-stream-forEach.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index f0d8a330363932..15eb03c018eca9 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -55,7 +55,7 @@ const { setTimeout } = require('timers/promises'); i++; if (i === 10) ac.abort(); assert.strictEqual(x, 1); - }, 10)), { name: 'AbortError' }); + }, 10)), { name: 'AbortError' }).then(common.mustCall()); } { From fa333dd467a41990064829af077df80f785d26de Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 14 Feb 2022 14:03:53 +0200 Subject: [PATCH 3/3] fixup! no timers --- test/parallel/test-stream-forEach.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 15eb03c018eca9..e3678352c41591 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -5,7 +5,7 @@ const { Readable, } = require('stream'); const assert = require('assert'); -const { setTimeout } = require('timers/promises'); +const { once } = require('events'); { // forEach works on synchronous streams with a synchronous predicate @@ -95,7 +95,7 @@ const { setTimeout } = require('timers/promises'); const forEachPromise = Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { calls++; - await setTimeout(100, { signal }); + await once(signal, 'abort'); }, { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => {