From 2ad3ec317de9707071c40795b107a7823df02300 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 28 Jun 2020 18:38:20 +0200 Subject: [PATCH] stream: destroy wrapped streams on error Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. PR-URL: https://github.com/nodejs/node/pull/34102 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 28 +++++++++++++--- .../test-stream2-readable-wrap-error.js | 32 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream2-readable-wrap-error.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index fc0baca1b81235..a466011b682beb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; -const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -1034,10 +1033,29 @@ Readable.prototype.wrap = function(stream) { } } - // Proxy certain important events. - for (const kProxyEvent of kProxyEvents) { - stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent)); - } + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + // TODO(ronag): Update readable state? + this.emit('close'); + }); + + stream.on('destroy', () => { + // TODO(ronag): this.destroy()? + this.emit('destroy'); + }); + + stream.on('pause', () => { + // TODO(ronag): this.pause()? + this.emit('pause'); + }); + + stream.on('resume', () => { + // TODO(ronag): this.resume()? + this.emit('resume'); + }); // When we try to consume some more bytes, simply unpause the // underlying stream. diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js new file mode 100644 index 00000000000000..b56b9bc41c7527 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -0,0 +1,32 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + const r = new Readable({ autoDestroy: true }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, true); + })); + oldStream.emit('error', new Error()); +} + +{ + const r = new Readable({ autoDestroy: false }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, false); + })); + oldStream.emit('error', new Error()); +}