From c31e2f6b0fcdc8e24a845028202c82fee885d4d0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Sep 2020 15:38:57 +0200 Subject: [PATCH] stream: fix legacy pipe error handling Fixes: https://github.com/nodejs/node/issues/35237 PR-URL: https://github.com/nodejs/node/pull/35257 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Rich Trott --- lib/internal/streams/legacy.js | 27 ++++++++++++++++--- lib/internal/streams/readable.js | 21 +-------------- lib/internal/streams/writable.js | 2 +- lib/stream.js | 4 +-- .../test-stream-pipe-error-handling.js | 16 ++++++++++- 5 files changed, 41 insertions(+), 29 deletions(-) diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js index 2bc7a86aa050b6..0a0d0571c46378 100644 --- a/lib/internal/streams/legacy.js +++ b/lib/internal/streams/legacy.js @@ -1,6 +1,7 @@ 'use strict'; const { + ArrayIsArray, ObjectSetPrototypeOf, } = primordials; @@ -58,12 +59,12 @@ Stream.prototype.pipe = function(dest, options) { function onerror(er) { cleanup(); if (EE.listenerCount(this, 'error') === 0) { - throw er; // Unhandled stream error in pipe. + this.emit('error', er); } } - source.on('error', onerror); - dest.on('error', onerror); + prependListener(source, 'error', onerror); + prependListener(dest, 'error', onerror); // Remove all the event listeners that were added. function cleanup() { @@ -92,4 +93,22 @@ Stream.prototype.pipe = function(dest, options) { return dest; }; -module.exports = Stream; +function prependListener(emitter, event, fn) { + // Sadly this is not cacheable as some libraries bundle their own + // event emitter implementation with them. + if (typeof emitter.prependListener === 'function') + return emitter.prependListener(event, fn); + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) + emitter.on(event, fn); + else if (ArrayIsArray(emitter._events[event])) + emitter._events[event].unshift(fn); + else + emitter._events[event] = [fn, emitter._events[event]]; +} + +module.exports = { Stream, prependListener }; diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index cb13210d2b87db..fd2a8635674899 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -22,7 +22,6 @@ 'use strict'; const { - ArrayIsArray, NumberIsInteger, NumberIsNaN, NumberParseInt, @@ -38,7 +37,7 @@ module.exports = Readable; Readable.ReadableState = ReadableState; const EE = require('events'); -const Stream = require('internal/streams/legacy'); +const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { @@ -69,24 +68,6 @@ function nop() {} const { errorOrDestroy } = destroyImpl; -function prependListener(emitter, event, fn) { - // Sadly this is not cacheable as some libraries bundle their own - // event emitter implementation with them. - if (typeof emitter.prependListener === 'function') - return emitter.prependListener(event, fn); - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - // the prependListener() method. The goal is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) - emitter.on(event, fn); - else if (ArrayIsArray(emitter._events[event])) - emitter._events[event].unshift(fn); - else - emitter._events[event] = [fn, emitter._events[event]]; -} - function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 67da8061295fe0..4c85106353df63 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -38,7 +38,7 @@ module.exports = Writable; Writable.WritableState = WritableState; const EE = require('events'); -const Stream = require('internal/streams/legacy'); +const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); const { diff --git a/lib/stream.js b/lib/stream.js index 08898e5607fb29..5f842d1d4d1ea4 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -25,9 +25,7 @@ const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); -// Note: export Stream before Readable/Writable/Duplex/... -// to avoid a cross-reference(require) issues -const Stream = module.exports = require('internal/streams/legacy'); +const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); diff --git a/test/parallel/test-stream-pipe-error-handling.js b/test/parallel/test-stream-pipe-error-handling.js index 027d4ddf3c0079..cf3a3699d0975d 100644 --- a/test/parallel/test-stream-pipe-error-handling.js +++ b/test/parallel/test-stream-pipe-error-handling.js @@ -22,7 +22,7 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const Stream = require('stream').Stream; +const { Stream, PassThrough } = require('stream'); { const source = new Stream(); @@ -108,3 +108,17 @@ const Stream = require('stream').Stream; w.removeListener('error', () => {}); removed = true; } + +{ + const _err = new Error('this should be handled'); + const destination = new PassThrough(); + destination.once('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + + const stream = new Stream(); + stream + .pipe(destination); + + destination.destroy(_err); +}