From 0c690456cfd87ff83cc662340a32a23f48fa77ff Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Fri, 11 Feb 2022 20:21:42 +0200 Subject: [PATCH] stream: add more forEach tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/41937 Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy Reviewed-By: Tobias Nießen Reviewed-By: Mestery Reviewed-By: James M Snell Reviewed-By: Antoine du Hamel --- test/parallel/test-stream-forEach.js | 49 ++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 82013554cd2aa8..ae9dfd431f9f37 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -5,7 +5,7 @@ const { Readable, } = require('stream'); const assert = require('assert'); -const { setTimeout } = require('timers/promises'); +const { once } = require('events'); { // forEach works on synchronous streams with a synchronous predicate @@ -43,6 +43,51 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } +{ + // forEach works on an infinite stream + const ac = new AbortController(); + const { signal } = ac; + const stream = Readable.from(async function* () { + while (true) yield 1; + }(), { signal }); + let i = 0; + assert.rejects(stream.forEach(common.mustCall((x) => { + i++; + if (i === 10) ac.abort(); + assert.strictEqual(x, 1); + }, 10)), { name: 'AbortError' }).then(common.mustCall()); +} + +{ + // Emitting an error during `forEach` + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach(async (x) => { + if (x === 3) { + stream.emit('error', new Error('boom')); + } + }), /boom/).then(common.mustCall()); +} + +{ + // Throwing an error during `forEach` (sync) + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach((x) => { + if (x === 3) { + throw new Error('boom'); + } + }), /boom/).then(common.mustCall()); +} + +{ + // Throwing an error during `forEach` (async) + const stream = Readable.from([1, 2, 3, 4, 5]); + assert.rejects(stream.forEach(async (x) => { + if (x === 3) { + return Promise.reject(new Error('boom')); + } + }), /boom/).then(common.mustCall()); +} + { // Concurrency + AbortSignal const ac = new AbortController(); @@ -50,7 +95,7 @@ const { setTimeout } = require('timers/promises'); const forEachPromise = Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { calls++; - await setTimeout(100, { signal }); + await once(signal, 'abort'); }, { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => {