Skip to content

Commit

Permalink
lib: promise version of streams.finished call clean up
Browse files Browse the repository at this point in the history
implement autoCleanup logic. update docs add autoCleanup description

ref: #44556
PR-URL: #44862
Refs: #44556
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
ntedgi authored and danielleadams committed Jan 3, 2023
1 parent 2a3bd11 commit 4ab1530
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 13 deletions.
5 changes: 5 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2341,6 +2341,7 @@ changes:
-->

* `stream` {Stream} A readable and/or writable stream.

* `options` {Object}
* `error` {boolean} If set to `false`, then a call to `emit('error', err)` is
not treated as finished. **Default:** `true`.
Expand All @@ -2354,8 +2355,12 @@ changes:
underlying stream will _not_ be aborted if the signal is aborted. The
callback will get called with an `AbortError`. All registered
listeners added by this function will also be removed.
* `cleanup` {boolean} remove all registered stream listeners.
**Default:** `false`.

* `callback` {Function} A callback function that takes an optional error
argument.

* Returns: {Function} A cleanup function which removes all registered
listeners.

Expand Down
14 changes: 13 additions & 1 deletion lib/internal/streams/end-of-stream.js
Expand Up @@ -19,6 +19,7 @@ const {
validateAbortSignal,
validateFunction,
validateObject,
validateBoolean
} = require('internal/validators');

const { Promise } = primordials;
Expand Down Expand Up @@ -243,8 +244,19 @@ function eos(stream, options, callback) {
}

function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
opts = kEmptyObject;
}
if (opts?.cleanup) {
validateBoolean(opts.cleanup, 'cleanup');
autoCleanup = opts.cleanup;
}
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
const cleanup = eos(stream, opts, (err) => {
if (autoCleanup) {
cleanup();
}
if (err) {
reject(err);
} else {
Expand Down
66 changes: 54 additions & 12 deletions test/parallel/test-stream-promises.js
Expand Up @@ -3,13 +3,10 @@
const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
Readable, Writable, promises,
} = stream;
const {
finished,
pipeline,
finished, pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
Expand All @@ -24,14 +21,11 @@ assert.strictEqual(finished, promisify(stream.finished));
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
];
const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];

const read = new Readable({
read() { }
read() {
}
});

const write = new Writable({
Expand Down Expand Up @@ -59,7 +53,8 @@ assert.strictEqual(finished, promisify(stream.finished));
// pipeline error
{
const read = new Readable({
read() { }
read() {
}
});

const write = new Writable({
Expand Down Expand Up @@ -101,3 +96,50 @@ assert.strictEqual(finished, promisify(stream.finished));
code: 'ENOENT'
}).then(common.mustCall());
}

{
const streamObj = new Readable();
assert.throws(() => {
// Passing cleanup option not as boolean
// should throw error
finished(streamObj, { cleanup: 2 });
}, { code: 'ERR_INVALID_ARG_TYPE' });
}

// Below code should not throw any errors as the
// streamObj is `Stream` and cleanup is boolean
{
const streamObj = new Readable();
finished(streamObj, { cleanup: true });
}


// Cleanup function should not be called when cleanup is set to false
// listenerCount should be 1 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj, { cleanup: false }).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 1);
});
}

// Cleanup function should be called when cleanup is set to true
// listenerCount should be 0 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj, { cleanup: true }).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 0);
});
}

// Cleanup function should not be called when cleanup has not been set
// listenerCount should be 1 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 1);
});
}

0 comments on commit 4ab1530

Please sign in to comment.