Skip to content

Commit

Permalink
stream: cleanup() when unpiping all streams.
Browse files Browse the repository at this point in the history
This PR makes sure the object emitted as the 'unpipe'
event in the destination stream is not shared between
destination, as it would be muted.

Refs: #12746
PR-URL: #18266
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
陈刚 authored and gibfahn committed Apr 13, 2018
1 parent 25db460 commit 10231a9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/_stream_readable.js
Expand Up @@ -747,7 +747,7 @@ Readable.prototype.unpipe = function(dest) {
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this, unpipeInfo);
dests[i].emit('unpipe', this, { hasUnpiped: false });
return this;
}

Expand Down
54 changes: 54 additions & 0 deletions test/parallel/test-stream-pipe-unpipe-streams.js
Expand Up @@ -31,3 +31,57 @@ source.unpipe(dest2);
source.unpipe(dest1);

assert.strictEqual(source._readableState.pipes, null);

{
// test `cleanup()` if we unpipe all streams.
const source = Readable({ read: () => {} });
const dest1 = Writable({ write: () => {} });
const dest2 = Writable({ write: () => {} });

let destCount = 0;
const srcCheckEventNames = ['end', 'data'];
const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];

const checkSrcCleanup = common.mustCall(() => {
assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipesCount, 0);
assert.strictEqual(source._readableState.flowing, false);

srcCheckEventNames.forEach((eventName) => {
assert.strictEqual(
source.listenerCount(eventName), 0,
`source's '${eventName}' event listeners not removed`
);
});
});

function checkDestCleanup(dest) {
const currentDestId = ++destCount;
source.pipe(dest);

const unpipeChecker = common.mustCall(() => {
assert.deepStrictEqual(
dest.listeners('unpipe'), [unpipeChecker],
`destination{${currentDestId}} should have a 'unpipe' event ` +
'listener which is `unpipeChecker`'
);
dest.removeListener('unpipe', unpipeChecker);
destCheckEventNames.forEach((eventName) => {
assert.strictEqual(
dest.listenerCount(eventName), 0,
`destination{${currentDestId}}'s '${eventName}' event ` +
'listeners not removed'
);
});

if (--destCount === 0)
checkSrcCleanup();
});

dest.on('unpipe', unpipeChecker);
}

checkDestCleanup(dest1);
checkDestCleanup(dest2);
source.unpipe();
}

0 comments on commit 10231a9

Please sign in to comment.