diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 6fa3540057be33..73d18b19bb53dc 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -147,6 +147,8 @@ function ReadableState(options, stream, isDuplex) { // Indicates whether the stream has errored. this.errored = false; + this.closed = false; + // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 3f029883705a0a..6d65abebcb030c 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -175,6 +175,8 @@ function WritableState(options, stream, isDuplex) { // is disabled we need a way to tell whether the stream has failed. this.errored = false; + this.closed = false; + // Count buffered requests this.bufferedRequestCount = 0; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index b80fb56a6bf324..3115202ca6bdee 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -73,6 +73,13 @@ function emitCloseNT(self) { const r = self._readableState; const w = self._writableState; + if (w) { + w.closed = true; + } + if (r) { + r.closed = true; + } + if ((w && w.emitClose) || (r && r.emitClose)) { self.emit('close'); } @@ -103,6 +110,7 @@ function undestroy() { if (r) { r.destroyed = false; r.errored = false; + r.closed = false; r.reading = false; r.ended = false; r.endEmitted = false; @@ -112,6 +120,7 @@ function undestroy() { if (w) { w.destroyed = false; w.errored = false; + w.closed = false; w.ended = false; w.ending = false; w.finalCalled = false; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index dd42eb9f5b3f46..0388533dac0167 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -44,16 +44,25 @@ function eos(stream, opts, callback) { callback.call(stream, err); }; - let writableFinished = stream.writableFinished || - (stream._writableState && stream._writableState.finished); - let readableEnded = stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); + const w = stream._writableState; + const r = stream._readableState; - if (writableFinished || readableEnded || stream.destroyed || - stream.aborted) { + let writableFinished = stream.writableFinished || (w && w.finished); + let readableEnded = stream.readableEnded || (r && r.endEmitted); + + const errorEmitted = (w && w.errorEmitted) || (r && r.errorEmitted); + const closed = (w && w.closed) || (r && r.closed); + + if (writableFinished || readableEnded || errorEmitted || closed) { + // TODO(ronag): rethrow if errorEmitted? + // TODO(ronag): premature close if closed but not + // errored, finished or ended? + + // Swallow any error past this point. if (opts.error !== false) stream.on('error', onerror); - // A destroy(err) call emits error in nextTick. + process.nextTick(callback.bind(stream)); + return () => { stream.removeListener('error', onerror); }; diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index caca55601868a2..b6db7a867606a4 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -197,10 +197,14 @@ const { promisify } = require('util'); { // Completed if readable-like is ended before. + let ticked = false; const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - finished(streamLike, common.mustCall()); + finished(streamLike, common.mustCall(() => { + assert.strictEqual(ticked, true); + })); + ticked = true; } { @@ -215,42 +219,6 @@ const { promisify } = require('util'); streamLike.emit('close'); } -{ - // Completed if writable-like is destroyed before. - - const streamLike = new EE(); - streamLike.destroyed = true; - streamLike.writable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if readable-like is aborted before. - - const streamLike = new EE(); - streamLike.destroyed = true; - streamLike.readable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if writable-like is aborted before. - - const streamLike = new EE(); - streamLike.aborted = true; - streamLike.writable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if readable-like is aborted before. - - const streamLike = new EE(); - streamLike.aborted = true; - streamLike.readable = true; - finished(streamLike, common.mustCall()); -} - { // Completed if streamlike is finished before.