From 4ab1530b9bc731e33fe76ee9d2c15a937d966aea Mon Sep 17 00:00:00 2001 From: "Naor Tedgi (Abu Emma)" Date: Sat, 15 Oct 2022 13:07:47 +0300 Subject: [PATCH] lib: promise version of streams.finished call clean up implement autoCleanup logic. update docs add autoCleanup description ref: https://github.com/nodejs/node/issues/44556 PR-URL: https://github.com/nodejs/node/pull/44862 Refs: https://github.com/nodejs/node/issues/44556 Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy --- doc/api/stream.md | 5 ++ lib/internal/streams/end-of-stream.js | 14 +++++- test/parallel/test-stream-promises.js | 66 ++++++++++++++++++++++----- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 9bd9877326fd01..3a3c0b053aacbe 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2341,6 +2341,7 @@ changes: --> * `stream` {Stream} A readable and/or writable stream. + * `options` {Object} * `error` {boolean} If set to `false`, then a call to `emit('error', err)` is not treated as finished. **Default:** `true`. @@ -2354,8 +2355,12 @@ changes: underlying stream will _not_ be aborted if the signal is aborted. The callback will get called with an `AbortError`. All registered listeners added by this function will also be removed. + * `cleanup` {boolean} remove all registered stream listeners. + **Default:** `false`. + * `callback` {Function} A callback function that takes an optional error argument. + * Returns: {Function} A cleanup function which removes all registered listeners. diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 806280ebc1f1d2..ba565a9ae52323 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -19,6 +19,7 @@ const { validateAbortSignal, validateFunction, validateObject, + validateBoolean } = require('internal/validators'); const { Promise } = primordials; @@ -243,8 +244,19 @@ function eos(stream, options, callback) { } function finished(stream, opts) { + let autoCleanup = false; + if (opts === null) { + opts = kEmptyObject; + } + if (opts?.cleanup) { + validateBoolean(opts.cleanup, 'cleanup'); + autoCleanup = opts.cleanup; + } return new Promise((resolve, reject) => { - eos(stream, opts, (err) => { + const cleanup = eos(stream, opts, (err) => { + if (autoCleanup) { + cleanup(); + } if (err) { reject(err); } else { diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 33bfa292720da1..fae2c37492eb5a 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -3,13 +3,10 @@ const common = require('../common'); const stream = require('stream'); const { - Readable, - Writable, - promises, + Readable, Writable, promises, } = stream; const { - finished, - pipeline, + finished, pipeline, } = require('stream/promises'); const fs = require('fs'); const assert = require('assert'); @@ -24,14 +21,11 @@ assert.strictEqual(finished, promisify(stream.finished)); { let finished = false; const processed = []; - const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), - ]; + const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')]; const read = new Readable({ - read() { } + read() { + } }); const write = new Writable({ @@ -59,7 +53,8 @@ assert.strictEqual(finished, promisify(stream.finished)); // pipeline error { const read = new Readable({ - read() { } + read() { + } }); const write = new Writable({ @@ -101,3 +96,50 @@ assert.strictEqual(finished, promisify(stream.finished)); code: 'ENOENT' }).then(common.mustCall()); } + +{ + const streamObj = new Readable(); + assert.throws(() => { + // Passing cleanup option not as boolean + // should throw error + finished(streamObj, { cleanup: 2 }); + }, { code: 'ERR_INVALID_ARG_TYPE' }); +} + +// Below code should not throw any errors as the +// streamObj is `Stream` and cleanup is boolean +{ + const streamObj = new Readable(); + finished(streamObj, { cleanup: true }); +} + + +// Cleanup function should not be called when cleanup is set to false +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable(); + assert.strictEqual(streamObj.listenerCount('end'), 0); + finished(streamObj, { cleanup: false }).then(() => { + assert.strictEqual(streamObj.listenerCount('end'), 1); + }); +} + +// Cleanup function should be called when cleanup is set to true +// listenerCount should be 0 after calling finish +{ + const streamObj = new Writable(); + assert.strictEqual(streamObj.listenerCount('end'), 0); + finished(streamObj, { cleanup: true }).then(() => { + assert.strictEqual(streamObj.listenerCount('end'), 0); + }); +} + +// Cleanup function should not be called when cleanup has not been set +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable(); + assert.strictEqual(streamObj.listenerCount('end'), 0); + finished(streamObj).then(() => { + assert.strictEqual(streamObj.listenerCount('end'), 1); + }); +}