Skip to content

Commit

Permalink
stream: writableNeedDrain
Browse files Browse the repository at this point in the history
Don't write to a stream which already has a full buffer.

Fixes: #35341

PR-URL: #35348
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag authored and aduh95 committed Nov 10, 2020
1 parent da53a3c commit dd0f8f1
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 1 deletion.
9 changes: 9 additions & 0 deletions doc/api/stream.md
Expand Up @@ -580,6 +580,15 @@ This property contains the number of bytes (or objects) in the queue
ready to be written. The value provides introspection data regarding
the status of the `highWaterMark`.

##### `writable.writableNeedDrain`
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` if the stream's buffer has been full and stream will emit `'drain'`.

##### `writable.writableObjectMode`
<!-- YAML
added: v12.3.0
Expand Down
5 changes: 5 additions & 0 deletions lib/_http_outgoing.js
Expand Up @@ -660,6 +660,11 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
get: function() { return this.finished; }
});

ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
get: function() {
return !this.destroyed && !this.finished && this[kNeedDrain];
}
});

const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/duplex.js
Expand Up @@ -87,6 +87,8 @@ ObjectDefineProperties(Duplex.prototype, {
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
writableEnded:
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
writableNeedDrain:
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),

destroyed: {
get() {
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/streams/pipeline.js
Expand Up @@ -123,6 +123,10 @@ async function pump(iterable, writable, finish) {
}
let error;
try {
if (writable.writableNeedDrain === true) {
await EE.once(writable, 'drain');
}

for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/streams/readable.js
Expand Up @@ -783,7 +783,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.emit('pipe', src);

// Start the flow if it hasn't been started already.
if (!state.flowing) {

if (dest.writableNeedDrain === true) {
if (state.flowing) {
src.pause();
}
} else if (!state.flowing) {
debug('pipe resume');
src.resume();
}
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/streams/writable.js
Expand Up @@ -805,6 +805,14 @@ ObjectDefineProperties(Writable.prototype, {
}
},

writableNeedDrain: {
get() {
const wState = this._writableState;
if (!wState) return false;
return !wState.destroyed && !wState.ending && wState.needDrain;
}
},

writableHighWaterMark: {
get() {
return this._writableState && this._writableState.highWaterMark;
Expand Down
29 changes: 29 additions & 0 deletions test/parallel/test-stream-pipe-needDrain.js
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');

// Pipe should not continue writing if writable needs drain.
{
const w = new Writable({
write(buf, encoding, callback) {

}
});

while (w.write('asd'));

assert.strictEqual(w.writableNeedDrain, true);

const r = new Readable({
read() {
this.push('asd');
}
});

w.write = common.mustNotCall();

r.pipe(w);
}

0 comments on commit dd0f8f1

Please sign in to comment.