Skip to content

Commit

Permalink
stream: avoid drain for sync streams
Browse files Browse the repository at this point in the history
Previously a sync writable receiving chunks
larger than highwatermark would unecessarily
ping pong needDrain.
  • Loading branch information
ronag committed Apr 16, 2020
1 parent 4a6a5c3 commit abc1d0b
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 16 deletions.
7 changes: 4 additions & 3 deletions benchmark/streams/writable-manywrites.js
Expand Up @@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, {
n: [2e6],
sync: ['yes', 'no'],
writev: ['yes', 'no'],
callback: ['yes', 'no']
callback: ['yes', 'no'],
len: [1024, 32 * 1024]
});

function main({ n, sync, writev, callback }) {
const b = Buffer.allocUnsafe(1024);
function main({ n, sync, writev, callback, len }) {
const b = Buffer.allocUnsafe(len);
const s = new Writable();
sync = sync === 'yes';

Expand Down
11 changes: 6 additions & 5 deletions lib/_stream_writable.js
Expand Up @@ -344,11 +344,6 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {

state.length += len;

const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
const last = state.lastBufferedRequest;
state.lastBufferedRequest = {
Expand All @@ -367,6 +362,12 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}

const ret = state.length < state.highWaterMark;

// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
Expand Down
6 changes: 5 additions & 1 deletion test/parallel/test-stream-big-packet.js
Expand Up @@ -36,7 +36,11 @@ class TestStream extends stream.Transform {
}
}

const s1 = new stream.PassThrough();
const s1 = new stream.Transform({
transform(chunk, encoding, cb) {
process.nextTick(cb, null, chunk);
}
});
const s2 = new stream.PassThrough();
const s3 = new TestStream();
s1.pipe(s3);
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-catch-rejections.js
Expand Up @@ -30,7 +30,7 @@ const assert = require('assert');
captureRejections: true,
highWaterMark: 1,
write(chunk, enc, cb) {
cb();
process.nextTick(cb);
}
});

Expand Down
Expand Up @@ -19,7 +19,7 @@ const writable = new stream.Writable({
});
}

cb();
process.nextTick(cb);
}, 3)
});

Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-stream-pipe-await-drain.js
Expand Up @@ -19,7 +19,7 @@ reader._read = () => {};

writer1._write = common.mustCall(function(chunk, encoding, cb) {
this.emit('chunk-received');
cb();
process.nextTick(cb);
}, 1);

writer1.once('chunk-received', () => {
Expand All @@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
reader._readableState.awaitDrainWriters.size,
1,
'awaitDrain should be 1 after first push, actual is ' +
reader._readableState.awaitDrainWriters
reader._readableState.awaitDrainWriters.size
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand All @@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => {
reader._readableState.awaitDrainWriters.size,
2,
'awaitDrain should be 2 after second push, actual is ' +
reader._readableState.awaitDrainWriters
reader._readableState.awaitDrainWriters.size
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand Down
6 changes: 4 additions & 2 deletions test/parallel/test-stream-writable-needdrain-state.js
Expand Up @@ -10,8 +10,10 @@ const transform = new stream.Transform({
});

function _transform(chunk, encoding, cb) {
assert.strictEqual(transform._writableState.needDrain, true);
cb();
process.nextTick(() => {
assert.strictEqual(transform._writableState.needDrain, true);
cb();
});
}

assert.strictEqual(transform._writableState.needDrain, false);
Expand Down

0 comments on commit abc1d0b

Please sign in to comment.