Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: fix readable state awaitDrain increase incorrectly in recursion #27572

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 51 additions & 14 deletions lib/_stream_readable.js
Expand Up @@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';

// The number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0;
addaleax marked this conversation as resolved.
Show resolved Hide resolved
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;

// If true, a maybeReadMore has been scheduled
this.readingMore = false;
Expand Down Expand Up @@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {

function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
state.awaitDrain = 0;
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
stream.emit('data', chunk);
} else {
// Update the buffer info.
Expand Down Expand Up @@ -511,7 +519,11 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
state.awaitDrain = 0;
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
}

if (state.length === 0) {
Expand Down Expand Up @@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;

if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
state.awaitDrainWriters = new Set(
state.awaitDrainWriters ? [state.awaitDrainWriters] : []
);
}
}

state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

Expand Down Expand Up @@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (ondrain && state.awaitDrain &&
if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
Expand All @@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src);
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
src.pause();
Expand Down Expand Up @@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};

function pipeOnDrain(src) {
function pipeOnDrain(src, dest) {
addaleax marked this conversation as resolved.
Show resolved Hide resolved
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {

// `ondrain` will call directly,
// `this` maybe not a reference to dest,
// so we use the real dest here.
if (state.awaitDrainWriters === dest) {
debug('pipeOnDrain', 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
debug('pipeOnDrain', state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
Expand Down
@@ -0,0 +1,28 @@
'use strict';
const common = require('../common');
const { PassThrough } = require('stream');

const encode = new PassThrough({
highWaterMark: 1
});

const decode = new PassThrough({
highWaterMark: 1
});

const send = common.mustCall((buf) => {
encode.write(buf);
}, 4);

let i = 0;
const onData = common.mustCall(() => {
if (++i === 2) {
send(Buffer.from([0x3]));
send(Buffer.from([0x4]));
}
}, 4);

encode.pipe(decode).on('data', onData);

send(Buffer.from([0x1]));
send(Buffer.from([0x2]));
25 changes: 13 additions & 12 deletions test/parallel/test-stream-pipe-await-drain-manual-resume.js
Expand Up @@ -28,10 +28,10 @@ readable.pipe(writable);

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'Expected awaitDrain to equal 1 but instead got ' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
writable,
'Expected awaitDrainWriters to be a Writable but instead got ' +
`${readable._readableState.awaitDrainWriters}`
);
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
Expand All @@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => {

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'.resume() should not reset the counter but instead got ' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
writable,
'.resume() should not reset the awaitDrainWriters, but instead got ' +
`${readable._readableState.awaitDrainWriters}`
);
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
Expand All @@ -65,10 +65,11 @@ readable.push(null);

writable.on('finish', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
0,
'awaitDrain should equal 0 after all chunks are written but instead got' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
null,
`awaitDrainWriters should be reset to null
after all chunks are written but instead got
${readable._readableState.awaitDrainWriters}`
);
// Everything okay, all chunks were written.
}));
Expand Up @@ -6,16 +6,16 @@ const assert = require('assert');
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
readable._readableState.awaitDrain,
0
readable._readableState.awaitDrainWriters,
null,
);

if (chunk.length === 32 * 1024) { // first chunk
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
assert.strictEqual(readable._readableState.awaitDrain, 1);
assert.strictEqual(readable._readableState.awaitDrainWriters, writable);
});
}

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-pipe-await-drain.js
Expand Up @@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {

writer1.once('chunk-received', () => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
0,
'awaitDrain initial value should be 0, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
setImmediate(() => {
// This one should *not* get through to writer1 because writer2 is not
Expand All @@ -39,10 +39,10 @@ writer1.once('chunk-received', () => {
// A "slow" consumer:
writer2._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
1,
'awaitDrain should be 1 after first push, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand All @@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {

writer3._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
2,
'awaitDrain should be 2 after second push, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream2-basic.js
Expand Up @@ -355,7 +355,6 @@ class TestWriter extends EE {
assert.strictEqual(v, null);

const w = new R();

w.write = function(buffer) {
written = true;
assert.strictEqual(ended, false);
Expand Down