Skip to content

Commit

Permalink
stream: add auto-destroy mode
Browse files Browse the repository at this point in the history
PR-URL: #22795
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
  • Loading branch information
mafintosh authored and targos committed Nov 2, 2018
1 parent 7a2134c commit d6bcf8b
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 12 deletions.
17 changes: 17 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1493,6 +1493,11 @@ changes:
pr-url: https://github.com/nodejs/node/pull/18438
description: >
Add `emitClose` option to specify if `'close'` is emitted on destroy
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22795
description: >
Add `autoDestroy` option to automatically `destroy()` the stream
when it emits `'finish'` or errors
-->

* `options` {Object}
Expand Down Expand Up @@ -1521,6 +1526,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

```js
const { Writable } = require('stream');
Expand Down Expand Up @@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
constructor and implement the `readable._read()` method.

#### new stream.Readable([options])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22795
description: >
Add `autoDestroy` option to automatically `destroy()` the stream
when it emits `'end'` or errors
-->

* `options` {Object}
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
Expand All @@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method.
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

```js
const { Readable } = require('stream');
Expand Down
23 changes: 18 additions & 5 deletions lib/_stream_readable.js
Expand Up @@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;

util.inherits(Readable, Stream);

const { errorOrDestroy } = destroyImpl;
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];

function prependListener(emitter, event, fn) {
Expand Down Expand Up @@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// Should .destroy() be called after 'end' (and potentially 'finish')
this.autoDestroy = !!options.autoDestroy;

// has it been destroyed
this.destroyed = false;

Expand Down Expand Up @@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
!state.objectMode &&
Expand All @@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {

if (addToFront) {
if (state.endEmitted)
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
Expand Down Expand Up @@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n) {
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};

Readable.prototype.pipe = function(dest, pipeOpts) {
Expand Down Expand Up @@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
errorOrDestroy(dest, er);
}

// Make sure our error handler is attached before userland ones.
Expand Down Expand Up @@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
const wState = stream._writableState;
if (!wState || (wState.autoDestroy && wState.finished)) {
stream.destroy();
}
}
}
}
26 changes: 20 additions & 6 deletions lib/_stream_writable.js
Expand Up @@ -45,6 +45,8 @@ const {
ERR_UNKNOWN_ENCODING
} = require('internal/errors').codes;

const { errorOrDestroy } = destroyImpl;

util.inherits(Writable, Stream);

function nop() {}
Expand Down Expand Up @@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!options.autoDestroy;

// count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -235,14 +240,14 @@ function Writable(options) {

// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};


function writeAfterEnd(stream, cb) {
var er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

Expand All @@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
Expand Down Expand Up @@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
errorOrDestroy(stream, er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
errorOrDestroy(stream, er);
// this can emit finish, but finish must
// always follow error
finishMaybe(stream, state);
Expand Down Expand Up @@ -612,7 +617,7 @@ function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
stream.emit('error', err);
errorOrDestroy(stream, err);
}
state.prefinished = true;
stream.emit('prefinish');
Expand All @@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
if (state.pendingcb === 0) {
state.finished = true;
stream.emit('finish');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
const rState = stream._readableState;
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
stream.destroy();
}
}
}
}
return need;
Expand Down
20 changes: 19 additions & 1 deletion lib/internal/streams/destroy.js
Expand Up @@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
self.emit('error', err);
}

function errorOrDestroy(stream, err) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.

const rState = stream._readableState;
const wState = stream._writableState;

if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
stream.destroy(err);
else
stream.emit('error', err);
}


module.exports = {
destroy,
undestroy
undestroy,
errorOrDestroy
};
84 changes: 84 additions & 0 deletions test/parallel/test-stream-auto-destroy.js
@@ -0,0 +1,84 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

{
const r = new stream.Readable({
autoDestroy: true,
read() {
this.push('hello');
this.push('world');
this.push(null);
},
destroy: common.mustCall((err, cb) => cb())
});

let ended = false;

r.resume();

r.on('end', common.mustCall(() => {
ended = true;
}));

r.on('close', common.mustCall(() => {
assert(ended);
}));
}

{
const w = new stream.Writable({
autoDestroy: true,
write(data, enc, cb) {
cb(null);
},
destroy: common.mustCall((err, cb) => cb())
});

let finished = false;

w.write('hello');
w.write('world');
w.end();

w.on('finish', common.mustCall(() => {
finished = true;
}));

w.on('close', common.mustCall(() => {
assert(finished);
}));
}

{
const t = new stream.Transform({
autoDestroy: true,
transform(data, enc, cb) {
cb(null, data);
},
destroy: common.mustCall((err, cb) => cb())
});

let ended = false;
let finished = false;

t.write('hello');
t.write('world');
t.end();

t.resume();

t.on('end', common.mustCall(() => {
ended = true;
}));

t.on('finish', common.mustCall(() => {
finished = true;
}));

t.on('close', common.mustCall(() => {
assert(ended);
assert(finished);
}));
}

0 comments on commit d6bcf8b

Please sign in to comment.