diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 781e7a2a670195..0c55e9ad758fa0 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -27,7 +27,7 @@ const { const kEmpty = Symbol('kEmpty'); const kEof = Symbol('kEof'); -async function * map(fn, options) { +function map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); @@ -44,118 +44,120 @@ async function * map(fn, options) { validateInteger(concurrency, 'concurrency', 1); - const ac = new AbortController(); - const stream = this; - const queue = []; - const signal = ac.signal; - const signalOpt = { signal }; - - const abort = () => ac.abort(); - if (options?.signal?.aborted) { - abort(); - } - - options?.signal?.addEventListener('abort', abort); - - let next; - let resume; - let done = false; - - function onDone() { - done = true; - } + return async function* map() { + const ac = new AbortController(); + const stream = this; + const queue = []; + const signal = ac.signal; + const signalOpt = { signal }; - async function pump() { - try { - for await (let val of stream) { - if (done) { - return; - } + const abort = () => ac.abort(); + if (options?.signal?.aborted) { + abort(); + } - if (signal.aborted) { - throw new AbortError(); - } + options?.signal?.addEventListener('abort', abort); - try { - val = fn(val, signalOpt); - } catch (err) { - val = PromiseReject(err); - } + let next; + let resume; + let done = false; - if (val === kEmpty) { - continue; - } + function onDone() { + done = true; + } - if (typeof val?.catch === 'function') { - val.catch(onDone); + async function pump() { + try { + for await (let val of stream) { + if (done) { + return; + } + + if (signal.aborted) { + throw new AbortError(); + } + + try { + val = fn(val, signalOpt); + } catch (err) { + val = PromiseReject(err); + } + + if (val === kEmpty) { + continue; + } + + if (typeof val?.catch === 'function') { + val.catch(onDone); + } + + queue.push(val); + if (next) { + next(); + next = null; + } + + if (!done && queue.length && queue.length >= concurrency) { + await new Promise((resolve) => { + resume = resolve; + }); + } } - + queue.push(kEof); + } catch (err) { + const val = PromiseReject(err); + PromisePrototypeCatch(val, onDone); queue.push(val); + } finally { + done = true; if (next) { next(); next = null; } - - if (!done && queue.length && queue.length >= concurrency) { - await new Promise((resolve) => { - resume = resolve; - }); - } - } - queue.push(kEof); - } catch (err) { - const val = PromiseReject(err); - PromisePrototypeCatch(val, onDone); - queue.push(val); - } finally { - done = true; - if (next) { - next(); - next = null; + options?.signal?.removeEventListener('abort', abort); } - options?.signal?.removeEventListener('abort', abort); } - } - - pump(); - - try { - while (true) { - while (queue.length > 0) { - const val = await queue[0]; - - if (val === kEof) { - return; - } - if (signal.aborted) { - throw new AbortError(); - } + pump(); - if (val !== kEmpty) { - yield val; + try { + while (true) { + while (queue.length > 0) { + const val = await queue[0]; + + if (val === kEof) { + return; + } + + if (signal.aborted) { + throw new AbortError(); + } + + if (val !== kEmpty) { + yield val; + } + + queue.shift(); + if (resume) { + resume(); + resume = null; + } } - queue.shift(); - if (resume) { - resume(); - resume = null; - } + await new Promise((resolve) => { + next = resolve; + }); } + } finally { + ac.abort(); - await new Promise((resolve) => { - next = resolve; - }); - } - } finally { - ac.abort(); - - done = true; - if (resume) { - resume(); - resume = null; + done = true; + if (resume) { + resume(); + resume = null; + } } - } + }.call(this); } async function* asIndexedPairs(options) { @@ -215,7 +217,7 @@ async function forEach(fn, options) { for await (const unused of this.map(forEachFn, options)); } -async function * filter(fn, options) { +function filter(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); @@ -226,7 +228,7 @@ async function * filter(fn, options) { } return kEmpty; } - yield* this.map(filterFn, options); + return this.map(filterFn, options); } async function toArray(options) { @@ -243,10 +245,13 @@ async function toArray(options) { return result; } -async function* flatMap(fn, options) { - for await (const val of this.map(fn, options)) { - yield* val; - } +function flatMap(fn, options) { + const values = this.map(fn, options); + return async function* flatMap() { + for await (const val of values) { + yield* val; + } + }.call(this); } function toIntegerOrInfinity(number) { diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js index 100921a766977e..494c94f02f8cb0 100644 --- a/test/parallel/test-stream-filter.js +++ b/test/parallel/test-stream-filter.js @@ -87,20 +87,11 @@ const { setTimeout } = require('timers/promises'); { // Error cases - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const unused of Readable.from([1]).filter(1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).filter((x) => x, { - concurrency: 'Foo' - })); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).filter((x) => x, 1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).filter((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js index 1ace8a0cf513bc..952043b07b35f7 100644 --- a/test/parallel/test-stream-flatMap.js +++ b/test/parallel/test-stream-flatMap.js @@ -109,20 +109,11 @@ function oneTo5() { { // Error cases - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const unused of Readable.from([1]).flatMap(1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).flatMap((x) => x, { - concurrency: 'Foo' - })); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).flatMap((x) => x, 1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).flatMap((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index 2d5c5894e1eb0f..22e7e7f4e0da2c 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -86,20 +86,11 @@ const { setTimeout } = require('timers/promises'); { // Error cases - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const unused of Readable.from([1]).map(1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).map((x) => x, { - concurrency: 'Foo' - })); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); - assert.rejects(async () => { - // eslint-disable-next-line no-unused-vars - for await (const _ of Readable.from([1]).map((x) => x, 1)); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/); + assert.throws(() => Readable.from([1]).map((x) => x, { + concurrency: 'Foo' + }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); } { // Test result is a Readable