From 06625ff0a6fa77990c576efc842b8c3a458329bd Mon Sep 17 00:00:00 2001 From: iMoses Date: Thu, 3 Feb 2022 12:55:05 +0200 Subject: [PATCH] stream: use synchronous error validation & validate abort signal option made sure top level methods aren't async/generators so that validation errors could be caught synchronously also added validation for the abort signal option PR-URL: https://github.com/nodejs/node/pull/41777 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Nitzan Uziely --- lib/internal/streams/operators.js | 68 +++++++++++++++++--- test/parallel/test-stream-asIndexedPairs.mjs | 8 ++- test/parallel/test-stream-drop-take.js | 6 ++ test/parallel/test-stream-flatMap.js | 1 + test/parallel/test-stream-map.js | 1 + test/parallel/test-stream-reduce.js | 2 + test/parallel/test-stream-some-every.js | 11 ++++ test/parallel/test-stream-toArray.js | 12 ++++ 8 files changed, 98 insertions(+), 11 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 707112f75e6c84..6dd6e015eed8d3 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -10,7 +10,10 @@ const { }, AbortError, } = require('internal/errors'); -const { validateInteger } = require('internal/validators'); +const { + validateAbortSignal, + validateInteger, +} = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); @@ -33,10 +36,12 @@ function map(fn, options) { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); } - if (options != null && typeof options !== 'object') { throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } let concurrency = 1; if (options?.concurrency != null) { @@ -161,17 +166,33 @@ function map(fn, options) { }.call(this); } -async function* asIndexedPairs(options) { - let index = 0; - for await (const val of this) { - if (options?.signal?.aborted) { - throw new AbortError({ cause: options.signal.reason }); - } - yield [index++, val]; +function asIndexedPairs(options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); } + + return async function* asIndexedPairs() { + let index = 0; + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError({ cause: options.signal.reason }); + } + yield [index++, val]; + } + }.call(this); } async function some(fn, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some // Note that some does short circuit but also closes the iterator if it does const ac = new AbortController(); @@ -246,6 +267,13 @@ async function reduce(reducer, initialValue, options) { throw new ERR_INVALID_ARG_TYPE( 'reducer', ['Function', 'AsyncFunction'], reducer); } + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + let hasInitialValue = arguments.length > 1; if (options?.signal?.aborted) { const err = new AbortError(undefined, { cause: options.signal.reason }); @@ -283,6 +311,13 @@ async function reduce(reducer, initialValue, options) { } async function toArray(options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + const result = []; for await (const val of this) { if (options?.signal?.aborted) { @@ -316,6 +351,13 @@ function toIntegerOrInfinity(number) { } function drop(number, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + number = toIntegerOrInfinity(number); return async function* drop() { if (options?.signal?.aborted) { @@ -332,8 +374,14 @@ function drop(number, options) { }.call(this); } - function take(number, options) { + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + if (options?.signal != null) { + validateAbortSignal(options.signal, 'options.signal'); + } + number = toIntegerOrInfinity(number); return async function* take() { if (options?.signal?.aborted) { diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs index 382ec7a8af04b6..6f577caefd28dc 100644 --- a/test/parallel/test-stream-asIndexedPairs.mjs +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -1,6 +1,6 @@ import '../common/index.mjs'; import { Readable } from 'stream'; -import { deepStrictEqual, rejects } from 'assert'; +import { deepStrictEqual, rejects, throws } from 'assert'; { // asIndexedPairs with a synchronous stream @@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert'; await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); }, /AbortError/); } + +{ + // Error cases + throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/); +} diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index ddeb6054a78164..cb55a4f7ee1813 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -93,4 +93,10 @@ const naturals = () => from(async function*() { for (const example of invalidArgs) { throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/); } + + throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); + + throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); } diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js index 952043b07b35f7..cd5e0c8a9c403e 100644 --- a/test/parallel/test-stream-flatMap.js +++ b/test/parallel/test-stream-flatMap.js @@ -114,6 +114,7 @@ function oneTo5() { concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index 24da9419f3950f..ba0571fe3a7b95 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises'); concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-reduce.js b/test/parallel/test-stream-reduce.js index a8b41efa28415d..56271c5e232627 100644 --- a/test/parallel/test-stream-reduce.js +++ b/test/parallel/test-stream-reduce.js @@ -121,6 +121,8 @@ function sum(p, c) { // Error cases assert.rejects(() => Readable.from([]).reduce(1), /TypeError/); assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/); + assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/); + assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/); } { diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js index c2be5ea955bcd2..91733613049121 100644 --- a/test/parallel/test-stream-some-every.js +++ b/test/parallel/test-stream-some-every.js @@ -87,6 +87,17 @@ function oneTo5Async() { assert.rejects(async () => { await Readable.from([1]).every(1); }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + + assert.rejects(async () => { + await Readable.from([1]).every((x) => x, 1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + + assert.rejects(async () => { + await Readable.from([1]).every((x) => x, { + signal: true + }); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { await Readable.from([1]).every((x) => x, { concurrency: 'Foo' diff --git a/test/parallel/test-stream-toArray.js b/test/parallel/test-stream-toArray.js index 85e21edf97e3e5..5c86410ed74c09 100644 --- a/test/parallel/test-stream-toArray.js +++ b/test/parallel/test-stream-toArray.js @@ -79,3 +79,15 @@ const assert = require('assert'); const result = Readable.from([1, 2, 3, 4, 5]).toArray(); assert.strictEqual(result instanceof Promise, true); } +{ + // Error cases + assert.rejects(async () => { + await Readable.from([1]).toArray(1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + + assert.rejects(async () => { + await Readable.from([1]).toArray({ + signal: true + }); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +}