Skip to content

Commit

Permalink
stream: use synchronous error validation on iteration helpers
Browse files Browse the repository at this point in the history
 is no longer a generator function,
instead it returns a called generator so that validation can be
synchronous and not wait for the first iteration

Fixes: #41648

PR-URL: #41652
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
iMoses authored and danielleadams committed Apr 21, 2022
1 parent 25109a6 commit 4b63439
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 141 deletions.
203 changes: 104 additions & 99 deletions lib/internal/streams/operators.js
Expand Up @@ -27,7 +27,7 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

async function * map(fn, options) {
function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
Expand All @@ -44,118 +44,120 @@ async function * map(fn, options) {

validateInteger(concurrency, 'concurrency', 1);

const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

options?.signal?.addEventListener('abort', abort);

let next;
let resume;
let done = false;

function onDone() {
done = true;
}
return async function* map() {
const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}
const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

if (signal.aborted) {
throw new AbortError();
}
options?.signal?.addEventListener('abort', abort);

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}
let next;
let resume;
let done = false;

if (val === kEmpty) {
continue;
}
function onDone() {
done = true;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
}

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}

queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
options?.signal?.removeEventListener('abort', abort);
}
options?.signal?.removeEventListener('abort', abort);
}
}

pump();

try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}
pump();

if (val !== kEmpty) {
yield val;
try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

if (val !== kEmpty) {
yield val;
}

queue.shift();
if (resume) {
resume();
resume = null;
}
}

queue.shift();
if (resume) {
resume();
resume = null;
}
await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

done = true;
if (resume) {
resume();
resume = null;
done = true;
if (resume) {
resume();
resume = null;
}
}
}
}.call(this);
}

async function* asIndexedPairs(options) {
Expand Down Expand Up @@ -215,7 +217,7 @@ async function forEach(fn, options) {
for await (const unused of this.map(forEachFn, options));
}

async function * filter(fn, options) {
function filter(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
Expand All @@ -226,7 +228,7 @@ async function * filter(fn, options) {
}
return kEmpty;
}
yield* this.map(filterFn, options);
return this.map(filterFn, options);
}

async function toArray(options) {
Expand All @@ -243,10 +245,13 @@ async function toArray(options) {
return result;
}

async function* flatMap(fn, options) {
for await (const val of this.map(fn, options)) {
yield* val;
}
function flatMap(fn, options) {
const values = this.map(fn, options);
return async function* flatMap() {
for await (const val of values) {
yield* val;
}
}.call(this);
}

function toIntegerOrInfinity(number) {
Expand Down
19 changes: 5 additions & 14 deletions test/parallel/test-stream-filter.js
Expand Up @@ -87,20 +87,11 @@ const { setTimeout } = require('timers/promises');

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).filter(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).filter((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
19 changes: 5 additions & 14 deletions test/parallel/test-stream-flatMap.js
Expand Up @@ -109,20 +109,11 @@ function oneTo5() {

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).flatMap(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
19 changes: 5 additions & 14 deletions test/parallel/test-stream-map.js
Expand Up @@ -86,20 +86,11 @@ const { setTimeout } = require('timers/promises');

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).map(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down

0 comments on commit 4b63439

Please sign in to comment.