Skip to content

Commit

Permalink
stream: pipeline should drain empty readable
Browse files Browse the repository at this point in the history
This simplifies some cases where the last stream is a Duplex
without any expected output.

await pipeline(readable, duplex)

PR-URL: #40654
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag authored and danielleadams committed Feb 1, 2022
1 parent 7237bcc commit c9f1398
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
10 changes: 9 additions & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
isIterable,
isReadableNodeStream,
isNodeStream,
isReadableFinished,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

Expand Down Expand Up @@ -229,7 +230,14 @@ function pipelineImpl(streams, callback, opts) {

if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
}

if (i === 0) {
Expand Down
22 changes: 21 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustSucceed(() => {
assert.strictEqual(dst.destroyed, false);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}
Expand Down Expand Up @@ -1447,3 +1447,23 @@ const tsp = require('timers/promises');
assert.strictEqual(text, 'Hello World!');
}));
}

{
const pipelinePromise = promisify(pipeline);

async function run() {
const read = new Readable({
read() {}
});

const duplex = new PassThrough();

read.push(null);

await pipelinePromise(read, duplex);

assert.strictEqual(duplex.destroyed, true);
}

run();
}

0 comments on commit c9f1398

Please sign in to comment.