diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a004ce20d0aeb8..6fff20c94ec7f3 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -716,35 +716,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) { ondrain(); } + function pause() { + // If the user unpiped during `dest.write()`, it is possible + // to get stuck in a permanently paused state if that write + // also returned false. + // => Check whether `dest` is still a piping destination. + if (!cleanedUp) { + if (state.pipes.length === 1 && state.pipes[0] === dest) { + debug('false write response, pause', 0); + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + debug('false write response, pause', state.awaitDrainWriters.size); + state.awaitDrainWriters.add(dest); + } + src.pause(); + } + if (!ondrain) { + // When the dest drains, it reduces the awaitDrain counter + // on the source. This would be more elegant with a .once() + // handler in flow(), but adding and removing repeatedly is + // too slow. + ondrain = pipeOnDrain(src, dest); + dest.on('drain', ondrain); + } + } + src.on('data', ondata); function ondata(chunk) { debug('ondata'); const ret = dest.write(chunk); debug('dest.write', ret); if (ret === false) { - // If the user unpiped during `dest.write()`, it is possible - // to get stuck in a permanently paused state if that write - // also returned false. - // => Check whether `dest` is still a piping destination. - if (!cleanedUp) { - if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug('false write response, pause', 0); - state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; - } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug('false write response, pause', state.awaitDrainWriters.size); - state.awaitDrainWriters.add(dest); - } - src.pause(); - } - if (!ondrain) { - // When the dest drains, it reduces the awaitDrain counter - // on the source. This would be more elegant with a .once() - // handler in flow(), but adding and removing repeatedly is - // too slow. - ondrain = pipeOnDrain(src, dest); - dest.on('drain', ondrain); - } + pause(); } } @@ -793,7 +797,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (dest.writableNeedDrain === true) { if (state.flowing) { - src.pause(); + pause(); } } else if (!state.flowing) { debug('pipe resume'); diff --git a/test/parallel/test-stream-pipe-needDrain.js b/test/parallel/test-stream-pipe-needDrain.js index 7e8f5f6f47a70a..0836c81da22438 100644 --- a/test/parallel/test-stream-pipe-needDrain.js +++ b/test/parallel/test-stream-pipe-needDrain.js @@ -5,12 +5,13 @@ const assert = require('assert'); const Readable = require('_stream_readable'); const Writable = require('_stream_writable'); -// Pipe should not continue writing if writable needs drain. +// Pipe should pause temporarily if writable needs drain. { const w = new Writable({ write(buf, encoding, callback) { - - } + process.nextTick(callback); + }, + highWaterMark: 1 }); while (w.write('asd')); @@ -20,10 +21,12 @@ const Writable = require('_stream_writable'); const r = new Readable({ read() { this.push('asd'); + this.push(null); } }); - w.write = common.mustNotCall(); + r.on('pause', common.mustCall(2)); + r.on('end', common.mustCall()); r.pipe(w); }