diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js index 16040ad66c0d73..f5e362d65fe353 100644 --- a/test/parallel/test-stream-filter.js +++ b/test/parallel/test-stream-filter.js @@ -8,38 +8,41 @@ const assert = require('assert'); const { setTimeout } = require('timers/promises'); { - // forEach works on synchronous streams with a synchronous predicate - const stream = Readable.from([1, 2, 3]); - const result = [1, 2, 3]; + // Filter works on synchronous streams with a synchronous predicate + const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3); + const result = [1, 2]; (async () => { - await stream.forEach((value) => assert.strictEqual(value, result.shift())); + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } })().then(common.mustCall()); } { - // forEach works an asynchronous streams - const stream = Readable.from([1, 2, 3]).filter(async (x) => { + // Filter works on synchronous streams with an asynchronous predicate + const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { await Promise.resolve(); - return true; + return x > 3; }); - const result = [1, 2, 3]; + const result = [4, 5]; (async () => { - await stream.forEach((value) => assert.strictEqual(value, result.shift())); + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } })().then(common.mustCall()); } { - // forEach works on asynchronous streams with a asynchronous forEach fn - const stream = Readable.from([1, 2, 3]).filter(async (x) => { + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { await Promise.resolve(); - return true; - }); - const result = [1, 2, 3]; + return x + x; + }).filter((x) => x > 5); + const result = [6, 8, 10]; (async () => { - await stream.forEach(async (value) => { - await Promise.resolve(); - assert.strictEqual(value, result.shift()); - }); + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } })().then(common.mustCall()); } @@ -47,14 +50,16 @@ const { setTimeout } = require('timers/promises'); // Concurrency + AbortSignal const ac = new AbortController(); let calls = 0; - const forEachPromise = - Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { - calls++; - await setTimeout(100, { signal }); - }, { signal: ac.signal, concurrency: 2 }); + const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => { - await forEachPromise; + for await (const item of stream) { + // nope + console.log(item); + } }, { name: 'AbortError', }).then(common.mustCall()); @@ -65,22 +70,40 @@ const { setTimeout } = require('timers/promises'); }); } +{ + // Concurrency result order + const stream = Readable.from([1, 2]).filter(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + return true; + }, { concurrency: 2 }); + + (async () => { + const expected = [1, 2]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + { // Error cases assert.rejects(async () => { - Readable.from([1]).forEach(1); + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).filter(1)); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); assert.rejects(async () => { - Readable.from([1]).forEach((x) => x, { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).filter((x) => x, { concurrency: 'Foo' - }); + })); }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); assert.rejects(async () => { - Readable.from([1]).forEach((x) => x, 1); + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).filter((x) => x, 1)); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); } { - // Test result is a Promise - const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true); - assert.strictEqual(typeof stream.then, 'function'); -} + // Test result is a Readable + const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true); + assert.strictEqual(stream.readable, true); +} \ No newline at end of file diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 100921a766977e..16040ad66c0d73 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -8,41 +8,38 @@ const assert = require('assert'); const { setTimeout } = require('timers/promises'); { - // Filter works on synchronous streams with a synchronous predicate - const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3); - const result = [1, 2]; + // forEach works on synchronous streams with a synchronous predicate + const stream = Readable.from([1, 2, 3]); + const result = [1, 2, 3]; (async () => { - for await (const item of stream) { - assert.strictEqual(item, result.shift()); - } + await stream.forEach((value) => assert.strictEqual(value, result.shift())); })().then(common.mustCall()); } { - // Filter works on synchronous streams with an asynchronous predicate - const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { + // forEach works an asynchronous streams + const stream = Readable.from([1, 2, 3]).filter(async (x) => { await Promise.resolve(); - return x > 3; + return true; }); - const result = [4, 5]; + const result = [1, 2, 3]; (async () => { - for await (const item of stream) { - assert.strictEqual(item, result.shift()); - } + await stream.forEach((value) => assert.strictEqual(value, result.shift())); })().then(common.mustCall()); } { - // Map works on asynchronous streams with a asynchronous mapper - const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + // forEach works on asynchronous streams with a asynchronous forEach fn + const stream = Readable.from([1, 2, 3]).filter(async (x) => { await Promise.resolve(); - return x + x; - }).filter((x) => x > 5); - const result = [6, 8, 10]; + return true; + }); + const result = [1, 2, 3]; (async () => { - for await (const item of stream) { - assert.strictEqual(item, result.shift()); - } + await stream.forEach(async (value) => { + await Promise.resolve(); + assert.strictEqual(value, result.shift()); + }); })().then(common.mustCall()); } @@ -50,16 +47,14 @@ const { setTimeout } = require('timers/promises'); // Concurrency + AbortSignal const ac = new AbortController(); let calls = 0; - const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => { - calls++; - await setTimeout(100, { signal }); - }, { signal: ac.signal, concurrency: 2 }); + const forEachPromise = + Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => { - for await (const item of stream) { - // nope - console.log(item); - } + await forEachPromise; }, { name: 'AbortError', }).then(common.mustCall()); @@ -70,40 +65,22 @@ const { setTimeout } = require('timers/promises'); }); } -{ - // Concurrency result order - const stream = Readable.from([1, 2]).filter(async (item, { signal }) => { - await setTimeout(10 - item, { signal }); - return true; - }, { concurrency: 2 }); - - (async () => { - const expected = [1, 2]; - for await (const item of stream) { - assert.strictEqual(item, expected.shift()); - } - })().then(common.mustCall()); -} - { // Error cases assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const unused of Readable.from([1]).filter(1)); + Readable.from([1]).forEach(1); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).filter((x) => x, { + Readable.from([1]).forEach((x) => x, { concurrency: 'Foo' - })); + }); }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).filter((x) => x, 1)); + Readable.from([1]).forEach((x) => x, 1); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); } { - // Test result is a Readable - const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true); - assert.strictEqual(stream.readable, true); + // Test result is a Promise + const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true); + assert.strictEqual(typeof stream.then, 'function'); }