diff --git a/doc/api/stream.md b/doc/api/stream.md index fb2f2da28d1a87..f2e1c2a0c4ff9e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1493,6 +1493,11 @@ changes: pr-url: https://github.com/nodejs/node/pull/18438 description: > Add `emitClose` option to specify if `'close'` is emitted on destroy + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/22795 + description: > + Add `autoDestroy` option to automatically `destroy()` the stream + when it emits `'finish'` or errors --> * `options` {Object} @@ -1521,6 +1526,8 @@ changes: [`stream._destroy()`][writable-_destroy] method. * `final` {Function} Implementation for the [`stream._final()`][stream-_final] method. + * `autoDestroy` {boolean} Whether this stream should automatically call + `.destroy()` on itself after ending. **Default:** `false`. ```js const { Writable } = require('stream'); @@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])` constructor and implement the `readable._read()` method. #### new stream.Readable([options]) + * `options` {Object} * `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store @@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method. method. * `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy] method. + * `autoDestroy` {boolean} Whether this stream should automatically call + `.destroy()` on itself after ending. **Default:** `false`. ```js const { Readable } = require('stream'); diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 488d10a10b5bbd..2a2122e0e553cd 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator; util.inherits(Readable, Stream); +const { errorOrDestroy } = destroyImpl; const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { @@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'end' (and potentially 'finish') + this.autoDestroy = !!options.autoDestroy; + // has it been destroyed this.destroyed = false; @@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (!skipChunkCheck) er = chunkInvalid(state, chunk); if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && @@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (addToFront) { if (state.endEmitted) - stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else addChunk(stream, state, chunk, true); } else if (state.ended) { - stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); } else if (state.destroyed) { return false; } else { @@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function(n) { - this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); + errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { unpipe(); dest.removeListener('error', onerror); if (EE.listenerCount(dest, 'error') === 0) - dest.emit('error', er); + errorOrDestroy(dest, er); } // Make sure our error handler is attached before userland ones. @@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.readable = false; stream.emit('end'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + const wState = stream._writableState; + if (!wState || (wState.autoDestroy && wState.finished)) { + stream.destroy(); + } + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 3bad957912b323..160179cd0e84fa 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -45,6 +45,8 @@ const { ERR_UNKNOWN_ENCODING } = require('internal/errors').codes; +const { errorOrDestroy } = destroyImpl; + util.inherits(Writable, Stream); function nop() {} @@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'finish' (and potentially 'end') + this.autoDestroy = !!options.autoDestroy; + // count buffered requests this.bufferedRequestCount = 0; @@ -235,14 +240,14 @@ function Writable(options) { // Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function() { - this.emit('error', new ERR_STREAM_CANNOT_PIPE()); + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); } @@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) { er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); } if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); return false; } @@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) { // after error process.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); } else { // the caller expect this to happen before if // it is async cb(er); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); // this can emit finish, but finish must // always follow error finishMaybe(stream, state); @@ -612,7 +617,7 @@ function callFinal(stream, state) { stream._final((err) => { state.pendingcb--; if (err) { - stream.emit('error', err); + errorOrDestroy(stream, err); } state.prefinished = true; stream.emit('prefinish'); @@ -639,6 +644,15 @@ function finishMaybe(stream, state) { if (state.pendingcb === 0) { state.finished = true; stream.emit('finish'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well + const rState = stream._readableState; + if (!rState || (rState.autoDestroy && rState.endEmitted)) { + stream.destroy(); + } + } } } return need; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a0383cc3cea70..ce9d2545e45022 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -82,7 +82,25 @@ function emitErrorNT(self, err) { self.emit('error', err); } +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const rState = stream._readableState; + const wState = stream._writableState; + + if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + stream.destroy(err); + else + stream.emit('error', err); +} + + module.exports = { destroy, - undestroy + undestroy, + errorOrDestroy }; diff --git a/test/parallel/test-stream-auto-destroy.js b/test/parallel/test-stream-auto-destroy.js new file mode 100644 index 00000000000000..7bce8a56368313 --- /dev/null +++ b/test/parallel/test-stream-auto-destroy.js @@ -0,0 +1,84 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +{ + const r = new stream.Readable({ + autoDestroy: true, + read() { + this.push('hello'); + this.push('world'); + this.push(null); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + let ended = false; + + r.resume(); + + r.on('end', common.mustCall(() => { + ended = true; + })); + + r.on('close', common.mustCall(() => { + assert(ended); + })); +} + +{ + const w = new stream.Writable({ + autoDestroy: true, + write(data, enc, cb) { + cb(null); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + let finished = false; + + w.write('hello'); + w.write('world'); + w.end(); + + w.on('finish', common.mustCall(() => { + finished = true; + })); + + w.on('close', common.mustCall(() => { + assert(finished); + })); +} + +{ + const t = new stream.Transform({ + autoDestroy: true, + transform(data, enc, cb) { + cb(null, data); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + let ended = false; + let finished = false; + + t.write('hello'); + t.write('world'); + t.end(); + + t.resume(); + + t.on('end', common.mustCall(() => { + ended = true; + })); + + t.on('finish', common.mustCall(() => { + finished = true; + })); + + t.on('close', common.mustCall(() => { + assert(ended); + assert(finished); + })); +}