From 194789f25b7f1b6c2b40f6a48ab24aa9b18521ff Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 9 Dec 2019 20:44:12 +0100 Subject: [PATCH] stream: make all streams error in a pipeline This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: https://github.com/nodejs/node/pull/30861 Fixes: https://github.com/nodejs/node/issues/28194 PR-URL: https://github.com/nodejs/node/pull/30869 Reviewed-By: Luigi Pinca Reviewed-By: Rich Trott --- lib/internal/streams/pipeline.js | 24 ++++++++++---- .../test-stream-pipeline-async-iterator.js | 31 +++++++++++++++++++ test/parallel/test-stream-pipeline.js | 6 ++++ 3 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-stream-pipeline-async-iterator.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 0c464605106630..ed5556e5d0a600 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -43,15 +43,21 @@ function destroyer(stream, reading, writing, callback) { // request.destroy just do .end - .abort is what we want if (isRequest(stream)) return stream.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(); + if (typeof stream.destroy === 'function') { + if (stream.req && stream._writableState === undefined) { + // This is a ClientRequest + // TODO(mcollina): backward compatible fix to avoid crashing. + // Possibly remove in a later semver-major change. + stream.req.on('error', noop); + } + return stream.destroy(err); + } callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } -function call(fn) { - fn(); -} +function noop() {} function pipe(from, to) { return from.pipe(to); @@ -81,9 +87,15 @@ function pipeline(...streams) { const writing = i > 0; return destroyer(stream, reading, writing, function(err) { if (!error) error = err; - if (err) destroys.forEach(call); + if (err) { + for (const destroy of destroys) { + destroy(err); + } + } if (reading) return; - destroys.forEach(call); + for (const destroy of destroys) { + destroy(); + } callback(error); }); }); diff --git a/test/parallel/test-stream-pipeline-async-iterator.js b/test/parallel/test-stream-pipeline-async-iterator.js new file mode 100644 index 00000000000000..06a2ed6ca877f8 --- /dev/null +++ b/test/parallel/test-stream-pipeline-async-iterator.js @@ -0,0 +1,31 @@ +'use strict'; + +const common = require('../common'); +const { Readable, PassThrough, pipeline } = require('stream'); +const assert = require('assert'); + +const _err = new Error('kaboom'); + +async function run() { + const source = new Readable({ + read() { + } + }); + source.push('hello'); + source.push('world'); + + setImmediate(() => { source.destroy(_err); }); + + const iterator = pipeline( + source, + new PassThrough(), + () => {}); + + iterator.setEncoding('utf8'); + + for await (const k of iterator) { + assert.strictEqual(k, 'helloworld'); + } +} + +run().catch(common.mustCall((err) => assert.strictEqual(err, _err))); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ef5a39fddd881f..4a41f053bd0a85 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -119,6 +119,12 @@ const { promisify } = require('util'); transform.on('close', common.mustCall()); write.on('close', common.mustCall()); + [read, transform, write].forEach((stream) => { + stream.on('error', common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + }); + const dst = pipeline(read, transform, write, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); }));