diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index df0d7bc85d366c..94d5256c20095c 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -36,7 +36,7 @@ function destroyStream(stream, err) { if (typeof stream.close === 'function') return stream.close(); } -function destroyer(stream, reading, writing, callback) { +function destroyer(stream, reading, writing, final, callback) { callback = once(callback); let destroyed = false; @@ -44,7 +44,10 @@ function destroyer(stream, reading, writing, callback) { eos(stream, { readable: reading, writable: writing }, (err) => { if (destroyed) return; destroyed = true; - destroyStream(stream, err); + const readable = stream.readable || isRequest(stream); + if (err || !final || !readable) { + destroyStream(stream, err); + } callback(err); }); @@ -176,7 +179,7 @@ function pipeline(...streams) { } function wrap(stream, reading, writing, final) { - destroys.push(destroyer(stream, reading, writing, (err) => { + destroys.push(destroyer(stream, reading, writing, final, (err) => { finish(err, final); })); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 6bfa1331834968..0d90f65f5a6643 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -918,6 +918,52 @@ const { promisify } = require('util'); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { assert.strictEqual(src.destroyed, true); + assert.strictEqual(dst.destroyed, false); + })); + src.end(); +} + +{ + const server = http.createServer((req, res) => { + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + const body = new PassThrough(); + pipeline( + body, + req, + common.mustCall((err) => { + assert(!err); + assert(!req.res); + assert(!req.aborted); + req.abort(); + server.close(); + }) + ); + body.end(); + }); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + pipeline(src, dst, common.mustCall((err) => { + assert(!err); + assert.strictEqual(dst.destroyed, false); + })); + src.end(); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + dst.readable = false; + pipeline(src, dst, common.mustCall((err) => { + assert(!err); assert.strictEqual(dst.destroyed, true); })); src.end();