Skip to content

Commit

Permalink
stream: cleanup() when unpiping all streams.
Browse files Browse the repository at this point in the history
This PR makes sure the object emitted as the 'unpipe'
event in the destination stream is not shared between
destination, as it would be muted.

Refs: #12746
PR-URL: #18266
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
陈刚 authored and mcollina committed Feb 5, 2018
1 parent 83c9315 commit a52b7a2
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ Readable.prototype.unpipe = function(dest) {
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this, unpipeInfo);
dests[i].emit('unpipe', this, { hasUnpiped: false });
return this;
}

Expand Down
54 changes: 54 additions & 0 deletions test/parallel/test-stream-pipe-unpipe-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,57 @@ source.unpipe(dest2);
source.unpipe(dest1);

assert.strictEqual(source._readableState.pipes, null);

{
// test `cleanup()` if we unpipe all streams.
const source = Readable({ read: () => {} });
const dest1 = Writable({ write: () => {} });
const dest2 = Writable({ write: () => {} });

let destCount = 0;
const srcCheckEventNames = ['end', 'data'];
const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];

const checkSrcCleanup = common.mustCall(() => {
assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipesCount, 0);
assert.strictEqual(source._readableState.flowing, false);

srcCheckEventNames.forEach((eventName) => {
assert.strictEqual(
source.listenerCount(eventName), 0,
`source's '${eventName}' event listeners not removed`
);
});
});

function checkDestCleanup(dest) {
const currentDestId = ++destCount;
source.pipe(dest);

const unpipeChecker = common.mustCall(() => {
assert.deepStrictEqual(
dest.listeners('unpipe'), [unpipeChecker],
`destination{${currentDestId}} should have a 'unpipe' event ` +
'listener which is `unpipeChecker`'
);
dest.removeListener('unpipe', unpipeChecker);
destCheckEventNames.forEach((eventName) => {
assert.strictEqual(
dest.listenerCount(eventName), 0,
`destination{${currentDestId}}'s '${eventName}' event ` +
'listeners not removed'
);
});

if (--destCount === 0)
checkSrcCleanup();
});

dest.on('unpipe', unpipeChecker);
}

checkDestCleanup(dest1);
checkDestCleanup(dest2);
source.unpipe();
}

0 comments on commit a52b7a2

Please sign in to comment.