diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 0c464605106630..ed5556e5d0a600 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -43,15 +43,21 @@ function destroyer(stream, reading, writing, callback) { // request.destroy just do .end - .abort is what we want if (isRequest(stream)) return stream.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(); + if (typeof stream.destroy === 'function') { + if (stream.req && stream._writableState === undefined) { + // This is a ClientRequest + // TODO(mcollina): backward compatible fix to avoid crashing. + // Possibly remove in a later semver-major change. + stream.req.on('error', noop); + } + return stream.destroy(err); + } callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } -function call(fn) { - fn(); -} +function noop() {} function pipe(from, to) { return from.pipe(to); @@ -81,9 +87,15 @@ function pipeline(...streams) { const writing = i > 0; return destroyer(stream, reading, writing, function(err) { if (!error) error = err; - if (err) destroys.forEach(call); + if (err) { + for (const destroy of destroys) { + destroy(err); + } + } if (reading) return; - destroys.forEach(call); + for (const destroy of destroys) { + destroy(); + } callback(error); }); }); diff --git a/test/parallel/test-stream-pipeline-async-iterator.js b/test/parallel/test-stream-pipeline-async-iterator.js new file mode 100644 index 00000000000000..06a2ed6ca877f8 --- /dev/null +++ b/test/parallel/test-stream-pipeline-async-iterator.js @@ -0,0 +1,31 @@ +'use strict'; + +const common = require('../common'); +const { Readable, PassThrough, pipeline } = require('stream'); +const assert = require('assert'); + +const _err = new Error('kaboom'); + +async function run() { + const source = new Readable({ + read() { + } + }); + source.push('hello'); + source.push('world'); + + setImmediate(() => { source.destroy(_err); }); + + const iterator = pipeline( + source, + new PassThrough(), + () => {}); + + iterator.setEncoding('utf8'); + + for await (const k of iterator) { + assert.strictEqual(k, 'helloworld'); + } +} + +run().catch(common.mustCall((err) => assert.strictEqual(err, _err))); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ef5a39fddd881f..4a41f053bd0a85 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -119,6 +119,12 @@ const { promisify } = require('util'); transform.on('close', common.mustCall()); write.on('close', common.mustCall()); + [read, transform, write].forEach((stream) => { + stream.on('error', common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + }); + const dst = pipeline(read, transform, write, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); }));