From f750a74a07192bbc96d029a0693ca8409f15bbf6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Sep 2020 15:38:57 +0200 Subject: [PATCH 1/2] stream: fix legacy pipe error handling Fixes: https://github.com/nodejs/node/issues/35237 --- lib/internal/streams/legacy.js | 27 ++++++++++++++++--- lib/internal/streams/readable.js | 21 +-------------- lib/internal/streams/writable.js | 2 +- lib/stream.js | 2 +- .../test-stream-pipe-error-handling.js | 13 ++++++++- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js index 2bc7a86aa050b6..0a0d0571c46378 100644 --- a/lib/internal/streams/legacy.js +++ b/lib/internal/streams/legacy.js @@ -1,6 +1,7 @@ 'use strict'; const { + ArrayIsArray, ObjectSetPrototypeOf, } = primordials; @@ -58,12 +59,12 @@ Stream.prototype.pipe = function(dest, options) { function onerror(er) { cleanup(); if (EE.listenerCount(this, 'error') === 0) { - throw er; // Unhandled stream error in pipe. + this.emit('error', er); } } - source.on('error', onerror); - dest.on('error', onerror); + prependListener(source, 'error', onerror); + prependListener(dest, 'error', onerror); // Remove all the event listeners that were added. function cleanup() { @@ -92,4 +93,22 @@ Stream.prototype.pipe = function(dest, options) { return dest; }; -module.exports = Stream; +function prependListener(emitter, event, fn) { + // Sadly this is not cacheable as some libraries bundle their own + // event emitter implementation with them. + if (typeof emitter.prependListener === 'function') + return emitter.prependListener(event, fn); + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) + emitter.on(event, fn); + else if (ArrayIsArray(emitter._events[event])) + emitter._events[event].unshift(fn); + else + emitter._events[event] = [fn, emitter._events[event]]; +} + +module.exports = { Stream, prependListener }; diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index abd7faf59f2723..b2a6ddfb29e9e6 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -22,7 +22,6 @@ 'use strict'; const { - ArrayIsArray, NumberIsInteger, NumberIsNaN, ObjectDefineProperties, @@ -38,7 +37,7 @@ module.exports = Readable; Readable.ReadableState = ReadableState; const EE = require('events'); -const Stream = require('internal/streams/legacy'); +const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { @@ -69,24 +68,6 @@ function nop() {} const { errorOrDestroy } = destroyImpl; -function prependListener(emitter, event, fn) { - // Sadly this is not cacheable as some libraries bundle their own - // event emitter implementation with them. - if (typeof emitter.prependListener === 'function') - return emitter.prependListener(event, fn); - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - // the prependListener() method. The goal is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) - emitter.on(event, fn); - else if (ArrayIsArray(emitter._events[event])) - emitter._events[event].unshift(fn); - else - emitter._events[event] = [fn, emitter._events[event]]; -} - function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index e3d13dc1bfc675..cf99b97a94ef6f 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -38,7 +38,7 @@ module.exports = Writable; Writable.WritableState = WritableState; const EE = require('events'); -const Stream = require('internal/streams/legacy'); +const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); const { diff --git a/lib/stream.js b/lib/stream.js index 9fff2e9d5c29c3..11f5ca997fef34 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -36,7 +36,7 @@ const internalBuffer = require('internal/buffer'); // Lazy loaded let promises = null; -const Stream = module.exports = require('internal/streams/legacy'); +const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); diff --git a/test/parallel/test-stream-pipe-error-handling.js b/test/parallel/test-stream-pipe-error-handling.js index 027d4ddf3c0079..ea10de14b8240f 100644 --- a/test/parallel/test-stream-pipe-error-handling.js +++ b/test/parallel/test-stream-pipe-error-handling.js @@ -22,7 +22,7 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const Stream = require('stream').Stream; +const { Stream, PassThrough } = require('stream'); { const source = new Stream(); @@ -108,3 +108,14 @@ const Stream = require('stream').Stream; w.removeListener('error', () => {}); removed = true; } + +{ + const destination = new PassThrough(); + destination.once('error', common.mustCall()); + + const stream = new Stream(); + stream + .pipe(destination); + + destination.destroy(new Error('this should be handled')); +} From c215a0e066f04ab12cd85ab0b975e1f50521f425 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Sep 2020 22:54:58 +0200 Subject: [PATCH 2/2] fixup --- test/parallel/test-stream-pipe-error-handling.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-pipe-error-handling.js b/test/parallel/test-stream-pipe-error-handling.js index ea10de14b8240f..cf3a3699d0975d 100644 --- a/test/parallel/test-stream-pipe-error-handling.js +++ b/test/parallel/test-stream-pipe-error-handling.js @@ -110,12 +110,15 @@ const { Stream, PassThrough } = require('stream'); } { + const _err = new Error('this should be handled'); const destination = new PassThrough(); - destination.once('error', common.mustCall()); + destination.once('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); const stream = new Stream(); stream .pipe(destination); - destination.destroy(new Error('this should be handled')); + destination.destroy(_err); }