From 2fa5848a0c24ae154a54509a85f40eca46978fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Tue, 14 Apr 2020 16:25:55 +0300 Subject: [PATCH 01/11] stream: close iterator in Readable.from Call iterator.return() if not all of its values are consumed. Fixes: https://github.com/nodejs/node/issues/32842 --- lib/internal/streams/from.js | 30 +++- .../test-readable-from-iterator-closing.js | 163 ++++++++++++++++++ 2 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-readable-from-iterator-closing.js diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index ab6db00a125a0b..f2d8dc603760ee 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -34,21 +34,49 @@ function from(Readable, iterable, opts) { objectMode: true, ...opts }); + // Reading boolean to protect against _read // being called before last iteration completion. let reading = false; + + // needToClose boolean if iterator needs to be explicitly closed + let needToClose = false; + readable._read = function() { if (!reading) { reading = true; next(); } }; + + readable._destroy = function(error, cb) { + close().then( + () => cb(error), + (e) => cb(error || e), + ); + }; + + async function close() { + if (needToClose) { + needToClose = false; + if (typeof iterator.return === 'function') { + const { value } = await iterator.return(); + await value; + } + } + } + async function next() { try { + needToClose = false; const { value, done } = await iterator.next(); + needToClose = !done; + const resolved = await value; if (done) { readable.push(null); - } else if (readable.push(await value)) { + } else if (readable.destroyed) { + await close(); + } else if (readable.push(resolved)) { next(); } else { reading = false; diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js new file mode 100644 index 00000000000000..f08feeb8abd3b1 --- /dev/null +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -0,0 +1,163 @@ +'use strict'; + +const { mustCall, mustNotCall } = require('../common'); +const { Readable } = require('stream'); +const { strictEqual } = require('assert'); + +async function asyncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + async function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncPromiseSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield Promise.resolve('a'); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncRejectedSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const secondNextMustNotCall = mustNotCall(); + + function* generate() { + try { + yield Promise.reject('a'); + secondNextMustNotCall(); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(generate()); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function noReturnAfterThrow() { + const returnMustNotCall = mustNotCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const nextMustCall = mustCall(); + + const stream = Readable.from({ + [Symbol.asyncIterator]() { return this; }, + async next() { + nextMustCall(); + throw new Error('a'); + }, + async return() { + returnMustNotCall(); + return { done: true }; + }, + }); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function closeStreamWhileNextIsPending() { + const finallyMustCall = mustCall(); + let resolveDestroy; + const destroyed = new Promise((resolve) => { resolveDestroy = resolve; }); + + async function* infiniteGenerate() { + try { + while (true) { + yield 'a'; + await destroyed; + } + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + stream.on('data', (data) => { + strictEqual(data, 'a'); + stream.destroy(); + resolveDestroy(); + }); +} + +Promise.all([ + asyncSupport(), + syncSupport(), + syncPromiseSupport(), + syncRejectedSupport(), + noReturnAfterThrow(), + closeStreamWhileNextIsPending() +]).then(mustCall()); From 980a7c88d32b31871215fa57b01616577dfdda43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 13:32:50 +0300 Subject: [PATCH 02/11] fixup: explicit `yielded` event in closeStreamWhileNextIsPending --- .../parallel/test-readable-from-iterator-closing.js | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js index f08feeb8abd3b1..015397c4a2e886 100644 --- a/test/parallel/test-readable-from-iterator-closing.js +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -130,13 +130,20 @@ async function noReturnAfterThrow() { async function closeStreamWhileNextIsPending() { const finallyMustCall = mustCall(); + const dataMustCall = mustCall(); + let resolveDestroy; - const destroyed = new Promise((resolve) => { resolveDestroy = resolve; }); + const destroyed = + new Promise((resolve) => { resolveDestroy = mustCall(resolve); }); + let resolveYielded; + const yielded = + new Promise((resolve) => { resolveYielded = mustCall(resolve); }); async function* infiniteGenerate() { try { while (true) { yield 'a'; + resolveYielded(); await destroyed; } } finally { @@ -147,7 +154,11 @@ async function closeStreamWhileNextIsPending() { const stream = Readable.from(infiniteGenerate()); stream.on('data', (data) => { + dataMustCall(); strictEqual(data, 'a'); + }); + + yielded.then(() => { stream.destroy(); resolveDestroy(); }); From b772927daa936df3b2b1ac88df45a5c26204d966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 13:37:50 +0300 Subject: [PATCH 03/11] fixup: test fot iterator closing after null has been yielded --- .../test-readable-from-iterator-closing.js | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js index 015397c4a2e886..2397e1fbf152c0 100644 --- a/test/parallel/test-readable-from-iterator-closing.js +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -164,11 +164,35 @@ async function closeStreamWhileNextIsPending() { }); } +async function closeAfterNullYielded() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(3); + + function* infiniteGenerate() { + try { + yield 'a'; + yield 'a'; + yield 'a'; + while (true) yield null; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + } +} + Promise.all([ asyncSupport(), syncSupport(), syncPromiseSupport(), syncRejectedSupport(), noReturnAfterThrow(), - closeStreamWhileNextIsPending() + closeStreamWhileNextIsPending(), + closeAfterNullYielded(), ]).then(mustCall()); From 57bb12656ed31c83a5c56b99a1ec5698ebbd3b36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 13:58:07 +0300 Subject: [PATCH 04/11] fixup: escape unhandled rejected promise --- lib/internal/streams/from.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index f2d8dc603760ee..604a87864ff73f 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -49,11 +49,14 @@ function from(Readable, iterable, opts) { } }; - readable._destroy = function(error, cb) { - close().then( - () => cb(error), - (e) => cb(error || e), - ); + readable._destroy = async function(error, cb) { + try { + await close(); + } catch (e) { + error = error || e; + } finally { + cb(error); + } }; async function close() { From 8a2974f7dfcdd34d0a67cd60873bcbf4d4c7d70e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 14:20:54 +0300 Subject: [PATCH 05/11] fixup: test for closing iterator on yielded null with plain event handling --- test/parallel/test-readable-from-iterator-closing.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js index 2397e1fbf152c0..5bf824607a6336 100644 --- a/test/parallel/test-readable-from-iterator-closing.js +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -181,10 +181,10 @@ async function closeAfterNullYielded() { const stream = Readable.from(infiniteGenerate()); - for await (const chunk of stream) { + stream.on('data', (chunk) => { bodyMustCall(); strictEqual(chunk, 'a'); - } + }); } Promise.all([ From 8528ad485b40487ded3805ccff617eac77ead389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 16:44:16 +0300 Subject: [PATCH 06/11] fixup: fix variable name --- test/parallel/test-readable-from-iterator-closing.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js index 5bf824607a6336..0254ccfc163093 100644 --- a/test/parallel/test-readable-from-iterator-closing.js +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -166,7 +166,7 @@ async function closeStreamWhileNextIsPending() { async function closeAfterNullYielded() { const finallyMustCall = mustCall(); - const bodyMustCall = mustCall(3); + const dataMustCall = mustCall(3); function* infiniteGenerate() { try { @@ -182,7 +182,7 @@ async function closeAfterNullYielded() { const stream = Readable.from(infiniteGenerate()); stream.on('data', (chunk) => { - bodyMustCall(); + dataMustCall(); strictEqual(chunk, 'a'); }); } From e821b987ccc31c5fcf92c1f558d31f0400baa565 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 16:44:50 +0300 Subject: [PATCH 07/11] fixup: run callback with process.nextTick --- lib/internal/streams/from.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 604a87864ff73f..7a0872330eaea3 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -55,7 +55,7 @@ function from(Readable, iterable, opts) { } catch (e) { error = error || e; } finally { - cb(error); + process.nextTick(() => cb(error)); } }; From e9becca61ad97748ffbafb550f21349253a14c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 16:56:04 +0300 Subject: [PATCH 08/11] remove unnesessary closure --- lib/internal/streams/from.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 7a0872330eaea3..891cedaf77c9c5 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -55,7 +55,7 @@ function from(Readable, iterable, opts) { } catch (e) { error = error || e; } finally { - process.nextTick(() => cb(error)); + process.nextTick(cb, error); } }; From 509be2c3b3633bd644a6fc3f4fcda1ac5f131657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 17:06:12 +0300 Subject: [PATCH 09/11] fixup: do not doublecheck needToClose flag --- lib/internal/streams/from.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 891cedaf77c9c5..41aee6ddfd60dd 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -51,7 +51,10 @@ function from(Readable, iterable, opts) { readable._destroy = async function(error, cb) { try { - await close(); + if (needToClose) { + needToClose = false; + await close(); + } } catch (e) { error = error || e; } finally { @@ -60,12 +63,9 @@ function from(Readable, iterable, opts) { }; async function close() { - if (needToClose) { - needToClose = false; - if (typeof iterator.return === 'function') { - const { value } = await iterator.return(); - await value; - } + if (typeof iterator.return === 'function') { + const { value } = await iterator.return(); + await value; } } From 7aac752bdbf39d23bd6a5d4448802fa37f7e6936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Thu, 16 Apr 2020 10:51:42 +0300 Subject: [PATCH 10/11] fixup: make _destroy not async --- lib/internal/streams/from.js | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 41aee6ddfd60dd..ca567914bbf0fe 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -49,16 +49,15 @@ function from(Readable, iterable, opts) { } }; - readable._destroy = async function(error, cb) { - try { - if (needToClose) { - needToClose = false; - await close(); - } - } catch (e) { - error = error || e; - } finally { - process.nextTick(cb, error); + readable._destroy = function(error, cb) { + if (needToClose) { + needToClose = false; + close().then( + () => process.nextTick(cb, error), + (e) => process.nextTick(cb, error || e), + ); + } else { + cb(error); } }; From bffbf584f04e7c4461ddba6dbd482af23217985f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Fri, 17 Apr 2020 11:18:05 +0300 Subject: [PATCH 11/11] fixup: trigger build