Skip to content

Commit

Permalink
stream: use synchronous error validation & validate abort signal option
Browse files Browse the repository at this point in the history
made sure top level methods aren't async/generators
so that validation errors could be caught synchronously
also added validation for the abort signal option

PR-URL: #41777
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Nitzan Uziely <linkgoron@gmail.com>
  • Loading branch information
iMoses committed Feb 3, 2022
1 parent 064783c commit 06625ff
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 11 deletions.
68 changes: 58 additions & 10 deletions lib/internal/streams/operators.js
Expand Up @@ -10,7 +10,10 @@ const {
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const {
validateAbortSignal,
validateInteger,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');

Expand All @@ -33,10 +36,12 @@ function map(fn, options) {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}

if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

let concurrency = 1;
if (options?.concurrency != null) {
Expand Down Expand Up @@ -161,17 +166,33 @@ function map(fn, options) {
}.call(this);
}

async function* asIndexedPairs(options) {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
function asIndexedPairs(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

return async function* asIndexedPairs() {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
}
}.call(this);
}

async function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
Expand Down Expand Up @@ -246,6 +267,13 @@ async function reduce(reducer, initialValue, options) {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
Expand Down Expand Up @@ -283,6 +311,13 @@ async function reduce(reducer, initialValue, options) {
}

async function toArray(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
Expand Down Expand Up @@ -316,6 +351,13 @@ function toIntegerOrInfinity(number) {
}

function drop(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
Expand All @@ -332,8 +374,14 @@ function drop(number, options) {
}.call(this);
}


function take(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
Expand Down
8 changes: 7 additions & 1 deletion test/parallel/test-stream-asIndexedPairs.mjs
@@ -1,6 +1,6 @@
import '../common/index.mjs';
import { Readable } from 'stream';
import { deepStrictEqual, rejects } from 'assert';
import { deepStrictEqual, rejects, throws } from 'assert';

{
// asIndexedPairs with a synchronous stream
Expand Down Expand Up @@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
}, /AbortError/);
}

{
// Error cases
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
}
6 changes: 6 additions & 0 deletions test/parallel/test-stream-drop-take.js
Expand Up @@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}

throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);

throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
1 change: 1 addition & 0 deletions test/parallel/test-stream-flatMap.js
Expand Up @@ -114,6 +114,7 @@ function oneTo5() {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-stream-map.js
Expand Up @@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-stream-reduce.js
Expand Up @@ -121,6 +121,8 @@ function sum(p, c) {
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
Expand Down
11 changes: 11 additions & 0 deletions test/parallel/test-stream-some-every.js
Expand Up @@ -87,6 +87,17 @@ function oneTo5Async() {
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
signal: true
});
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-stream-toArray.js
Expand Up @@ -79,3 +79,15 @@ const assert = require('assert');
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).toArray(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).toArray({
signal: true
});
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}

0 comments on commit 06625ff

Please sign in to comment.