Skip to content

Commit

Permalink
stream: pipeline don't destroy Duplex src before 'finish'
Browse files Browse the repository at this point in the history
pipeline was too agressive with destroying Duplex
streams which were the first argument into pipeline.

Just because it's !writable does not mean that it
is safe to be destroyed, unless it has also emitted
'finish'.

Fixes: nodejs#32955
  • Loading branch information
ronag committed Apr 21, 2020
1 parent 8a3fa32 commit e1edd39
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
31 changes: 25 additions & 6 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) {
return callback();
}

if (!err && reading && !writing && stream.writable) {
return callback();
}
const wState = stream._writableState;

const writableEnded = stream.writableEnded ||
(wState && wState.ended);
const writableFinished = stream.writableFinished ||
(wState && wState.finished);

const willFinish = stream.writable ||
(writableEnded && !writableFinished);
const willEnd = stream.readable;

if (err || !final || !stream.readable) {
destroyImpl.destroyer(stream, err);
if (!err) {
// First
if (reading && !writing && willFinish) {
return callback();
}

// Last
if (!reading && writing && willEnd) {
return callback();
}
}

destroyImpl.destroyer(stream, err);
callback(err);
});

Expand All @@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) {
.once('end', _destroy)
.once('error', _destroy);
} else {
_destroy(err);
// Do an extra tick so that 'finish' has a chance to be emitted if
// first stream is Duplex.
process.nextTick(_destroy, err);
}
});

Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const net = require('net');

{
let finished = false;
Expand Down Expand Up @@ -1118,3 +1119,51 @@ const { promisify } = require('util');
assert.strictEqual(closed, true);
}));
}

{
const server = net.createServer(common.mustCall((socket) => {
// echo server
pipeline(socket, socket, common.mustCall());
// 13 force destroys the socket before it has a chance to emit finish
socket.on('finish', common.mustCall(() => {
server.close();
}));
})).listen(0, common.mustCall(() => {
const socket = net.connect(server.address().port);
socket.end();
}));
}

{
const d = new Duplex({
autoDestroy: false,
write: common.mustCall((data, enc, cb) => {
d.push(data);
cb();
}),
read: common.mustCall(() => {
d.push(null);
}),
final: common.mustCall((cb) => {
setTimeout(() => {
assert.strictEqual(d.destroyed, false);
cb();
}, 1000);
}),
// `destroy()` won't be invoked by pipeline since
// the writable side has not completed when
// the pipeline has completed.
destroy: common.mustNotCall()
});

const sink = new Writable({
write: common.mustCall((data, enc, cb) => {
cb();
})
});

pipeline(d, sink, common.mustCall());

d.write('test');
d.end();
}

0 comments on commit e1edd39

Please sign in to comment.