Skip to content

Commit

Permalink
stream: don't destroy final readable stream in pipeline
Browse files Browse the repository at this point in the history
If the last stream in a pipeline is still usable/readable
don't destroy it to allow further composition.

Fixes: #32105

PR-URL: #32110
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag committed Mar 8, 2020
1 parent c49286b commit 4d93e10
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
13 changes: 10 additions & 3 deletions lib/internal/streams/pipeline.js
Expand Up @@ -25,9 +25,12 @@ let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
function destroyer(stream, reading, writing, final, callback) {
const _destroy = once((err) => {
destroyImpl.destroyer(stream, err);
const readable = stream.readable || isRequest(stream);
if (err || !final || !readable) {
destroyImpl.destroyer(stream, err);
}
callback(err);
});

Expand Down Expand Up @@ -68,6 +71,10 @@ function popCallback(streams) {
return streams.pop();
}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isPromise(obj) {
return !!(obj && typeof obj.then === 'function');
}
Expand Down Expand Up @@ -159,7 +166,7 @@ function pipeline(...streams) {
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
destroys.push(destroyer(stream, reading, writing, final, (err) => {
finish(err, final);
}));
}
Expand Down
48 changes: 47 additions & 1 deletion test/parallel/test-stream-pipeline.js
Expand Up @@ -916,7 +916,7 @@ const { promisify } = require('util');
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(dst.destroyed, true);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}
Expand All @@ -938,3 +938,49 @@ const { promisify } = require('util');
r.push(null);
r.emit('close');
}

{
const server = http.createServer((req, res) => {
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

const body = new PassThrough();
pipeline(
body,
req,
common.mustCall((err) => {
assert(!err);
assert(!req.res);
assert(!req.aborted);
req.abort();
server.close();
})
);
body.end();
});
}

{
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}

{
const src = new PassThrough();
const dst = new PassThrough();
dst.readable = false;
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}

9 comments on commit 4d93e10

@jasnell
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is still an issue in the edgecase where a pipeline is piping a stream back into itself (e.g. like an echo server) ...

const src = new Duplex(/** ... **/);
// ...
pipeline(src, src, () => {});

This will end up failing inconsistently.

@ronag
Copy link
Member Author

@ronag ronag commented on 4d93e10 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this example? Could you maybe show a full repro?

@jasnell
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at https://github.com/jasnell/quic/blob/rebase-nodejs-node/test/parallel/test-quic-quicsocket-packetloss-stream-tx.js

Essentially, when you have a Duplex and you're piping it back in to itself, it will fail inconsistently. In the example above, I send a single character at a time from the client that is echoed back by the server. When the stream ends on the client, the result is checked against what is expected. However, the client side of the stream is closing prematurely, which indicates that the server side is closing prematurely. I've traced it back to the pipeline closing things down. When I pipe manually using the 'data' and 'end' events, everything works perfectly.

@ronag
Copy link
Member Author

@ronag ronag commented on 4d93e10 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll checkout and build your fork and see if I can reproduce it there and make sense of it.

@jasnell
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more context, it is a recent change that broke this. I just rebased on master today. My prior rebase was on Feb 13th and pipeline was working fine then.

@ronag
Copy link
Member Author

@ronag ronag commented on 4d93e10 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect #31940

@ronag
Copy link
Member Author

@ronag ronag commented on 4d93e10 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be fixed once #32158 lands. Do you think we could wait until then before digging into this or would you prefer a solution asap?

@BenoitClaveau
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you known how can I fix it temporary ? My server is down due to this bug.

@ronag
Copy link
Member Author

@ronag ronag commented on 4d93e10 Mar 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell @BenoitClaveau could you try with #32198?

Please sign in to comment.