diff --git a/doc/api/stream.md b/doc/api/stream.md index cf03163218bfbc..b2f121f62604e0 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1525,6 +1525,11 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. +`stream.finished()` will error with `ERR_STREAM_PREMATURE_CLOSE` if: +* `Writable` emits `'close'` before `'finish'` and +[`writable.writableFinished`][]. +* `Readable` emits `'close'` before `'end'` and [`readable.readableEnded`][]. + The `finished` API is promisify-able as well; ```js @@ -1647,6 +1652,10 @@ run().catch(console.error); * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. +If any `Writable` or `Readable` stream emits `'close'` without being able to +fully flush or drain, `stream.pipeline()` will error with +`ERR_STREAM_PREMATURE_CLOSE`. + `stream.pipeline()` leaves dangling event listeners on the streams after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. @@ -2865,6 +2874,7 @@ contain multi-byte characters. [`process.stdout`]: process.html#process_process_stdout [`readable._read()`]: #stream_readable_read_size_1 [`readable.push('')`]: #stream_readable_push +[`readable.readableEnded`]: #stream_readable_readableended [`readable.setEncoding()`]: #stream_readable_setencoding_encoding [`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options [`stream.cork()`]: #stream_writable_cork diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 585a52ae2f2382..dd42eb9f5b3f46 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -79,10 +79,24 @@ function eos(stream, opts, callback) { }; const onclose = () => { - if (readable && !readableEnded) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); - } else if (writable && !writableFinished) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (readable) { + const ended = (stream._readableState && + stream._readableState.endEmitted) || stream.readableEnded; + if (!ended && !readableEnded) { + callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } else if (!readableEnded) { + // Compat. Stream has ended but haven't emitted 'end'. + callback.call(stream); + } + } else if (writable) { + const finished = (stream._writableState && + stream._writableState.finished) || stream.writableFinished; + if (!finished && !writableFinished) { + callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } else if (!writableFinished) { + // Compat. Stream has finished but haven't emitted 'finish'. + callback.call(stream); + } } }; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e0834171bfb8fc..9a3521f523b54e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -46,6 +46,29 @@ function destroyer(stream, reading, writing, callback) { if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { + if ( + err && + err.code === 'ERR_STREAM_PREMATURE_CLOSE' && + reading && + (stream._readableState && stream._readableState.ended) + ) { + // Some readable streams will emit 'close' before 'end'. However, since + // this is on the readable side 'end' should still be emitted if the + // stream has been ended and no error emitted. This should be allowed in + // favor of backwards compatibility. Since the stream is piped to a + // destination this should not result in any observable difference. + // We don't need to check if this is a writable premature close since + // eos will only fail with premature close on the reading side for + // duplex streams. + stream + .on('end', () => { + closed = true; + callback(); + }) + .on('error', callback); + return; + } + if (err) return callback(err); closed = true; callback(); diff --git a/test/parallel/test-http-client-finished.js b/test/parallel/test-http-client-finished.js index 337f7b596d7442..8eddf8660b4476 100644 --- a/test/parallel/test-http-client-finished.js +++ b/test/parallel/test-http-client-finished.js @@ -55,7 +55,7 @@ const { finished } = require('stream'); }).end(); finished(req, (err) => { common.expectsError({ - type: Error, + name: 'Error', code: 'ERR_STREAM_PREMATURE_CLOSE' })(err); finished(req, common.mustCall(() => { diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index c5e792b45e2e4b..caca55601868a2 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -273,27 +273,26 @@ const { promisify } = require('util'); } { - // Premature close if stream never emitted 'finish' - // even if writableFinished says something else. + // No error if stream never emitted 'end' + // even if readableEnded says something else. const streamLike = new EE(); streamLike.writable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' + finished(streamLike, common.mustCall((err) => { + assert.strictEqual(err, undefined); })); streamLike.writableFinished = true; streamLike.emit('close'); } - { - // Premature close if stream never emitted 'end' - // even if readableEnded says something else. + // No error if stream never emitted 'finished' + // even if writableFinished says something else. const streamLike = new EE(); streamLike.readable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' + finished(streamLike, common.mustCall((err) => { + assert.strictEqual(err, undefined); })); streamLike.readableEnded = true; streamLike.emit('close'); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 19fc246e2bf3cd..99876738415c69 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -912,4 +912,35 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); + // Make sure 'close' before 'end' finishes without error + // if readable has received eof. + // Ref: https://github.com/nodejs/node/issues/29699 + const r = new Readable(); + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + } + }); + pipeline(r, w, (err) => { + assert.strictEqual(err, undefined); + }); + r.push(null); + r.destroy(); +} + +{ + // Make sure 'close' before 'end' finishes without error + // if readable has received eof. + // Ref: https://github.com/nodejs/node/issues/29699 + const r = new Readable(); + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + } + }); + pipeline(r, w, (err) => { + assert.strictEqual(err, undefined); + }); + r.push(null); + r.emit('close'); }