diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 707112f75e6c84..855d095e62b42d 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -10,7 +10,10 @@ const { }, AbortError, } = require('internal/errors'); -const { validateInteger } = require('internal/validators'); +const { + validateAbortSignal, + validateInteger, +} = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); @@ -38,6 +41,10 @@ function map(fn, options) { throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + let concurrency = 1; if (options?.concurrency != null) { concurrency = MathFloor(options.concurrency); @@ -161,17 +168,35 @@ function map(fn, options) { }.call(this); } -async function* asIndexedPairs(options) { - let index = 0; - for await (const val of this) { - if (options?.signal?.aborted) { - throw new AbortError({ cause: options.signal.reason }); - } - yield [index++, val]; +function asIndexedPairs(options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + + return async function* asIndexedPairs() { + let index = 0; + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError({ cause: options.signal.reason }); + } + yield [index++, val]; + } + }.call(this); } -async function some(fn, options) { +function some(fn, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some // Note that some does short circuit but also closes the iterator if it does const ac = new AbortController(); @@ -185,27 +210,32 @@ async function some(fn, options) { }); } const mapped = this.map(fn, { ...options, signal: ac.signal }); - for await (const result of mapped) { - if (result) { - ac.abort(); - return true; + return async function some() { + for await (const result of mapped) { + if (result) { + ac.abort(); + return true; + } } - } - return false; + return false; + }.call(this); } -async function every(fn, options) { +function every(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws - return !(await some.call(this, async (...args) => { + const somePromise = some.call(this, async (...args) => { return !(await fn(...args)); - }, options)); + }, options); + return async function every() { + return !(await somePromise); + }.call(this); } -async function forEach(fn, options) { +function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); @@ -214,8 +244,11 @@ async function forEach(fn, options) { await fn(value, options); return kEmpty; } - // eslint-disable-next-line no-unused-vars - for await (const unused of this.map(forEachFn, options)); + const values = this.map(forEachFn, options); + return async function forEach() { + // eslint-disable-next-line no-unused-vars + for await (const unused of values); + }.call(this); } function filter(fn, options) { @@ -241,56 +274,78 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { } } -async function reduce(reducer, initialValue, options) { +function reduce(reducer, initialValue, options) { if (typeof reducer !== 'function') { throw new ERR_INVALID_ARG_TYPE( 'reducer', ['Function', 'AsyncFunction'], reducer); } - let hasInitialValue = arguments.length > 1; - if (options?.signal?.aborted) { - const err = new AbortError(undefined, { cause: options.signal.reason }); - this.once('error', () => {}); // The error is already propagated - await finished(this.destroy(err)); - throw err; + + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } - const ac = new AbortController(); - const signal = ac.signal; - if (options?.signal) { - const opts = { once: true, [kWeakHandler]: this }; - options.signal.addEventListener('abort', () => ac.abort(), opts); + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); } - let gotAnyItemFromStream = false; - try { - for await (const value of this) { - gotAnyItemFromStream = true; - if (options?.signal?.aborted) { - throw new AbortError(); + + let hasInitialValue = arguments.length > 1; + + return async function reduce() { + if (options?.signal?.aborted) { + const err = new AbortError(undefined, { cause: options.signal.reason }); + this.once('error', () => {}); // The error is already propagated + await finished(this.destroy(err)); + throw err; + } + const ac = new AbortController(); + const signal = ac.signal; + if (options?.signal) { + const opts = { once: true, [kWeakHandler]: this }; + options.signal.addEventListener('abort', () => ac.abort(), opts); + } + let gotAnyItemFromStream = false; + try { + for await (const value of this) { + gotAnyItemFromStream = true; + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (!hasInitialValue) { + initialValue = value; + hasInitialValue = true; + } else { + initialValue = await reducer(initialValue, value, { signal }); + } } - if (!hasInitialValue) { - initialValue = value; - hasInitialValue = true; - } else { - initialValue = await reducer(initialValue, value, { signal }); + if (!gotAnyItemFromStream && !hasInitialValue) { + throw new ReduceAwareErrMissingArgs(); } + } finally { + ac.abort(); } - if (!gotAnyItemFromStream && !hasInitialValue) { - throw new ReduceAwareErrMissingArgs(); - } - } finally { - ac.abort(); - } - return initialValue; + return initialValue; + }.call(this); } -async function toArray(options) { - const result = []; - for await (const val of this) { - if (options?.signal?.aborted) { - throw new AbortError(undefined, { cause: options.signal.reason }); - } - ArrayPrototypePush(result, val); +function toArray(options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); } - return result; + + return async function toArray() { + const result = []; + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError(undefined, { cause: options.signal.reason }); + } + ArrayPrototypePush(result, val); + } + return result; + }.call(this); } function flatMap(fn, options) { @@ -316,6 +371,14 @@ function toIntegerOrInfinity(number) { } function drop(number, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + number = toIntegerOrInfinity(number); return async function* drop() { if (options?.signal?.aborted) { @@ -334,6 +397,14 @@ function drop(number, options) { function take(number, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + number = toIntegerOrInfinity(number); return async function* take() { if (options?.signal?.aborted) { diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs index 382ec7a8af04b6..6f577caefd28dc 100644 --- a/test/parallel/test-stream-asIndexedPairs.mjs +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -1,6 +1,6 @@ import '../common/index.mjs'; import { Readable } from 'stream'; -import { deepStrictEqual, rejects } from 'assert'; +import { deepStrictEqual, rejects, throws } from 'assert'; { // asIndexedPairs with a synchronous stream @@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert'; await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); }, /AbortError/); } + +{ + // Error cases + throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/); +} diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index ddeb6054a78164..cb55a4f7ee1813 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -93,4 +93,10 @@ const naturals = () => from(async function*() { for (const example of invalidArgs) { throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/); } + + throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); + + throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); } diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js index 952043b07b35f7..cd5e0c8a9c403e 100644 --- a/test/parallel/test-stream-flatMap.js +++ b/test/parallel/test-stream-flatMap.js @@ -114,6 +114,7 @@ function oneTo5() { concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 82013554cd2aa8..7e52b747e2df04 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -67,17 +67,12 @@ const { setTimeout } = require('timers/promises'); { // Error cases - assert.rejects(async () => { - await Readable.from([1]).forEach(1); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1]).forEach((x) => x, { - concurrency: 'Foo' - }); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1]).forEach((x) => x, 1); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.throws(() => Readable.from([1]).forEach(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).forEach((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).forEach((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).forEach((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Promise diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index 24da9419f3950f..ba0571fe3a7b95 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises'); concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-reduce.js b/test/parallel/test-stream-reduce.js index a8b41efa28415d..f025ad1089065c 100644 --- a/test/parallel/test-stream-reduce.js +++ b/test/parallel/test-stream-reduce.js @@ -119,8 +119,11 @@ function sum(p, c) { { // Error cases - assert.rejects(() => Readable.from([]).reduce(1), /TypeError/); - assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/); + assert.throws(() => Readable.from([]).reduce(1), /TypeError/); + assert.throws(() => Readable.from([]).reduce('5'), /TypeError/); + + assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js index c2be5ea955bcd2..87715688d2e1d8 100644 --- a/test/parallel/test-stream-some-every.js +++ b/test/parallel/test-stream-some-every.js @@ -84,12 +84,17 @@ function oneTo5Async() { } { // Error cases - assert.rejects(async () => { - await Readable.from([1]).every(1); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1]).every((x) => x, { - concurrency: 'Foo' - }); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.throws(() => Readable.from([1]).some(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).some((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).some((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).some((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); + + assert.throws(() => Readable.from([1]).every(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).every((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).every((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).every((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } diff --git a/test/parallel/test-stream-toArray.js b/test/parallel/test-stream-toArray.js index 85e21edf97e3e5..d0dfe0d8c924cb 100644 --- a/test/parallel/test-stream-toArray.js +++ b/test/parallel/test-stream-toArray.js @@ -79,3 +79,8 @@ const assert = require('assert'); const result = Readable.from([1, 2, 3, 4, 5]).toArray(); assert.strictEqual(result instanceof Promise, true); } +{ + // Error cases + assert.throws(() => Readable.from([1]).toArray(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).toArray({ signal: true }), /ERR_INVALID_ARG_TYPE/); +}