From 12253f8c74c5c3106f81fae3eed4b8f2e062e66a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 5 Jan 2020 11:55:34 +0100 Subject: [PATCH] stream: sync stream unpipe resume pipe() ondata should not control flow state if cleaned up. Fixes: https://github.com/nodejs/node/issues/31190 Backport-PR-URL: https://github.com/nodejs/node/pull/32264 PR-URL: https://github.com/nodejs/node/pull/31191 Reviewed-By: Anna Henningsen Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: Rich Trott Backport-PR-URL: https://github.com/nodejs/node/pull/32264 --- lib/_stream_readable.js | 4 +++- .../test-stream-readable-unpipe-resume.js | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-readable-unpipe-resume.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c5b62582aebad7..9f53da30b8ad72 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -705,6 +705,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('false write response, pause', state.awaitDrain); state.awaitDrain++; } + if (!cleanedUp) { + src.pause(); + } if (!ondrain) { // When the dest drains, it reduces the awaitDrain counter // on the source. This would be more elegant with a .once() @@ -713,7 +716,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) { ondrain = pipeOnDrain(src); dest.on('drain', ondrain); } - src.pause(); } } diff --git a/test/parallel/test-stream-readable-unpipe-resume.js b/test/parallel/test-stream-readable-unpipe-resume.js new file mode 100644 index 00000000000000..b40f724bccfc83 --- /dev/null +++ b/test/parallel/test-stream-readable-unpipe-resume.js @@ -0,0 +1,20 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const fs = require('fs'); + +const readStream = fs.createReadStream(process.execPath); + +const transformStream = new stream.Transform({ + transform: common.mustCall(() => { + readStream.unpipe(); + readStream.resume(); + }) +}); + +readStream.on('end', common.mustCall()); + +readStream + .pipe(transformStream) + .resume();