From 9b778f192123bd3e2ecc8139a9c1acb31a872a13 Mon Sep 17 00:00:00 2001 From: naortedgi Date: Sun, 2 Oct 2022 12:49:49 +0300 Subject: [PATCH 1/4] lib: promise version of streams.finished call clean up implement autoCleanup logic. update docs add autoCleanup description ref: https://github.com/nodejs/node/issues/44556 --- doc/api/stream.md | 3 +++ lib/internal/streams/end-of-stream.js | 14 +++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 7c0cdfe8e5d531..52fc787f287cc2 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2376,6 +2376,9 @@ changes: underlying stream will _not_ be aborted if the signal is aborted. The callback will get called with an `AbortError`. All registered listeners added by this function will also be removed. + * `autoCleanup` {boolean} remove all registered stream listeners. + **Default:** `true`. + * `callback` {Function} A callback function that takes an optional error argument. * Returns: {Function} A cleanup function which removes all registered diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 806280ebc1f1d2..50569bca1c655a 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -19,6 +19,7 @@ const { validateAbortSignal, validateFunction, validateObject, + validateBoolean } = require('internal/validators'); const { Promise } = primordials; @@ -243,8 +244,19 @@ function eos(stream, options, callback) { } function finished(stream, opts) { + let autoCleanup = true; + if (opts === null) { + opts = kEmptyObject; + } + if (opts.autoCleanup) { + validateBoolean(opts.autoCleanup, 'autoCleanup'); + autoCleanup = opts.autoCleanup; + } return new Promise((resolve, reject) => { - eos(stream, opts, (err) => { + const cleanup = eos(stream, opts, (err) => { + if (autoCleanup) { + cleanup(); + } if (err) { reject(err); } else { From cd80cdc8118197507e19d023ddda1eecdcbd2215 Mon Sep 17 00:00:00 2001 From: naortedgi Date: Sun, 2 Oct 2022 22:53:13 +0300 Subject: [PATCH 2/4] test: adding unit tests to cover the autoCleanup parameter logic doc: update autoCleanup default value to false --- doc/api/stream.md | 3 +- lib/internal/streams/end-of-stream.js | 4 +-- test/parallel/test-stream-promises.js | 50 +++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 52fc787f287cc2..fc4c208be9157e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2377,10 +2377,11 @@ changes: callback will get called with an `AbortError`. All registered listeners added by this function will also be removed. * `autoCleanup` {boolean} remove all registered stream listeners. - **Default:** `true`. + **Default:** `false`. * `callback` {Function} A callback function that takes an optional error argument. + * Returns: {Function} A cleanup function which removes all registered listeners. diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 50569bca1c655a..730856721810c9 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -244,11 +244,11 @@ function eos(stream, options, callback) { } function finished(stream, opts) { - let autoCleanup = true; + let autoCleanup = false; if (opts === null) { opts = kEmptyObject; } - if (opts.autoCleanup) { + if (opts?.autoCleanup) { validateBoolean(opts.autoCleanup, 'autoCleanup'); autoCleanup = opts.autoCleanup; } diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 33bfa292720da1..cea303e7dc3796 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -101,3 +101,53 @@ assert.strictEqual(finished, promisify(stream.finished)); code: 'ENOENT' }).then(common.mustCall()); } + +{ + const streamObj = new Readable(); + assert.throws( + () => { + // Passing autoCleanup option not as boolean + // should throw error + finished(streamObj, {autoCleanup: 2}); + }, + {code: 'ERR_INVALID_ARG_TYPE'} + ); +} + +// Below code should not throw any errors as the +// streamObj is `Stream` and autoCleanup is boolean +{ + const streamObj = new Readable(); + finished(streamObj, {autoCleanup: true}) +} + + +// cleanup function should not be called when autoCleanup is set to false +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable(); + assert(streamObj.listenerCount('end') === 0); + finished(streamObj, {autoCleanup: false}).then(() => { + assert(streamObj.listenerCount('end') === 1); + }) +} + +// cleanup function should be called when autoCleanup is set to true +// listenerCount should be 0 after calling finish +{ + const streamObj = new Writable(); + assert(streamObj.listenerCount('end') === 0); + finished(streamObj, {autoCleanup: true}).then(() => { + assert(streamObj.listenerCount('end') === 0); + }) +} + +// cleanup function should not be called when autoCleanup has not been set +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable(); + assert(streamObj.listenerCount('end') === 0); + finished(streamObj).then(() => { + assert(streamObj.listenerCount('end') === 1); + }) +} From b65461a80c69e841e48fb9b4aeb96fa185f0e26f Mon Sep 17 00:00:00 2001 From: naortedgi Date: Sun, 2 Oct 2022 23:38:49 +0300 Subject: [PATCH 3/4] fix linting issues --- doc/api/stream.md | 1 + test/parallel/test-stream-promises.js | 62 ++++++++++++--------------- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index fc4c208be9157e..32cea87689cba0 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2363,6 +2363,7 @@ changes: --> * `stream` {Stream} A readable and/or writable stream. + * `options` {Object} * `error` {boolean} If set to `false`, then a call to `emit('error', err)` is not treated as finished. **Default:** `true`. diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index cea303e7dc3796..0c06e925b37bb5 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -3,13 +3,10 @@ const common = require('../common'); const stream = require('stream'); const { - Readable, - Writable, - promises, + Readable, Writable, promises, } = stream; const { - finished, - pipeline, + finished, pipeline, } = require('stream/promises'); const fs = require('fs'); const assert = require('assert'); @@ -24,14 +21,11 @@ assert.strictEqual(finished, promisify(stream.finished)); { let finished = false; const processed = []; - const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), - ]; + const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')]; const read = new Readable({ - read() { } + read() { + } }); const write = new Writable({ @@ -59,7 +53,8 @@ assert.strictEqual(finished, promisify(stream.finished)); // pipeline error { const read = new Readable({ - read() { } + read() { + } }); const write = new Writable({ @@ -104,50 +99,47 @@ assert.strictEqual(finished, promisify(stream.finished)); { const streamObj = new Readable(); - assert.throws( - () => { - // Passing autoCleanup option not as boolean - // should throw error - finished(streamObj, {autoCleanup: 2}); - }, - {code: 'ERR_INVALID_ARG_TYPE'} - ); + assert.throws(() => { + // Passing autoCleanup option not as boolean + // should throw error + finished(streamObj, { autoCleanup: 2 }); + }, { code: 'ERR_INVALID_ARG_TYPE' }); } // Below code should not throw any errors as the // streamObj is `Stream` and autoCleanup is boolean { const streamObj = new Readable(); - finished(streamObj, {autoCleanup: true}) + finished(streamObj, { autoCleanup: true }); } -// cleanup function should not be called when autoCleanup is set to false +// Cleanup function should not be called when autoCleanup is set to false // listenerCount should be 1 after calling finish { const streamObj = new Writable(); - assert(streamObj.listenerCount('end') === 0); - finished(streamObj, {autoCleanup: false}).then(() => { - assert(streamObj.listenerCount('end') === 1); - }) + assert.strictEqual(streamObj.listenerCount('end'), 0); + finished(streamObj, { autoCleanup: false }).then(() => { + assert.strictEqual(streamObj.listenerCount('end'), 1); + }); } -// cleanup function should be called when autoCleanup is set to true +// Cleanup function should be called when autoCleanup is set to true // listenerCount should be 0 after calling finish { const streamObj = new Writable(); - assert(streamObj.listenerCount('end') === 0); - finished(streamObj, {autoCleanup: true}).then(() => { - assert(streamObj.listenerCount('end') === 0); - }) + assert.strictEqual(streamObj.listenerCount('end'), 0); + finished(streamObj, { autoCleanup: true }).then(() => { + assert.strictEqual(streamObj.listenerCount('end'), 0); + }); } -// cleanup function should not be called when autoCleanup has not been set +// Cleanup function should not be called when autoCleanup has not been set // listenerCount should be 1 after calling finish { const streamObj = new Writable(); - assert(streamObj.listenerCount('end') === 0); + assert.strictEqual(streamObj.listenerCount('end'), 0); finished(streamObj).then(() => { - assert(streamObj.listenerCount('end') === 1); - }) + assert.strictEqual(streamObj.listenerCount('end'), 1); + }); } From 16d45f1bede0df5149c7a6d134bc9957e8c9445d Mon Sep 17 00:00:00 2001 From: naortedgi Date: Tue, 11 Oct 2022 00:22:41 +0300 Subject: [PATCH 4/4] rename parameter from `autoCleanup` to `cleanup` --- doc/api/stream.md | 2 +- lib/internal/streams/end-of-stream.js | 6 +++--- test/parallel/test-stream-promises.js | 18 +++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 32cea87689cba0..5b65340f494685 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2377,7 +2377,7 @@ changes: underlying stream will _not_ be aborted if the signal is aborted. The callback will get called with an `AbortError`. All registered listeners added by this function will also be removed. - * `autoCleanup` {boolean} remove all registered stream listeners. + * `cleanup` {boolean} remove all registered stream listeners. **Default:** `false`. * `callback` {Function} A callback function that takes an optional error diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 730856721810c9..ba565a9ae52323 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -248,9 +248,9 @@ function finished(stream, opts) { if (opts === null) { opts = kEmptyObject; } - if (opts?.autoCleanup) { - validateBoolean(opts.autoCleanup, 'autoCleanup'); - autoCleanup = opts.autoCleanup; + if (opts?.cleanup) { + validateBoolean(opts.cleanup, 'cleanup'); + autoCleanup = opts.cleanup; } return new Promise((resolve, reject) => { const cleanup = eos(stream, opts, (err) => { diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 0c06e925b37bb5..fae2c37492eb5a 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -100,41 +100,41 @@ assert.strictEqual(finished, promisify(stream.finished)); { const streamObj = new Readable(); assert.throws(() => { - // Passing autoCleanup option not as boolean + // Passing cleanup option not as boolean // should throw error - finished(streamObj, { autoCleanup: 2 }); + finished(streamObj, { cleanup: 2 }); }, { code: 'ERR_INVALID_ARG_TYPE' }); } // Below code should not throw any errors as the -// streamObj is `Stream` and autoCleanup is boolean +// streamObj is `Stream` and cleanup is boolean { const streamObj = new Readable(); - finished(streamObj, { autoCleanup: true }); + finished(streamObj, { cleanup: true }); } -// Cleanup function should not be called when autoCleanup is set to false +// Cleanup function should not be called when cleanup is set to false // listenerCount should be 1 after calling finish { const streamObj = new Writable(); assert.strictEqual(streamObj.listenerCount('end'), 0); - finished(streamObj, { autoCleanup: false }).then(() => { + finished(streamObj, { cleanup: false }).then(() => { assert.strictEqual(streamObj.listenerCount('end'), 1); }); } -// Cleanup function should be called when autoCleanup is set to true +// Cleanup function should be called when cleanup is set to true // listenerCount should be 0 after calling finish { const streamObj = new Writable(); assert.strictEqual(streamObj.listenerCount('end'), 0); - finished(streamObj, { autoCleanup: true }).then(() => { + finished(streamObj, { cleanup: true }).then(() => { assert.strictEqual(streamObj.listenerCount('end'), 0); }); } -// Cleanup function should not be called when autoCleanup has not been set +// Cleanup function should not be called when cleanup has not been set // listenerCount should be 1 after calling finish { const streamObj = new Writable();