diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index fc0baca1b81235..a466011b682beb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; -const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -1034,10 +1033,29 @@ Readable.prototype.wrap = function(stream) { } } - // Proxy certain important events. - for (const kProxyEvent of kProxyEvents) { - stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent)); - } + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + // TODO(ronag): Update readable state? + this.emit('close'); + }); + + stream.on('destroy', () => { + // TODO(ronag): this.destroy()? + this.emit('destroy'); + }); + + stream.on('pause', () => { + // TODO(ronag): this.pause()? + this.emit('pause'); + }); + + stream.on('resume', () => { + // TODO(ronag): this.resume()? + this.emit('resume'); + }); // When we try to consume some more bytes, simply unpause the // underlying stream. diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js new file mode 100644 index 00000000000000..b56b9bc41c7527 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -0,0 +1,32 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + const r = new Readable({ autoDestroy: true }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, true); + })); + oldStream.emit('error', new Error()); +} + +{ + const r = new Readable({ autoDestroy: false }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, false); + })); + oldStream.emit('error', new Error()); +}