From 5a55b1df97a1f93c5fb81bf758050d81524ae834 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 24 Feb 2020 23:38:16 +0100 Subject: [PATCH] stream: ensure pipeline always destroys streams There was an edge case where an incorrect assumption was made in regardos whether eos/finished means that the stream is actually destroyed or not. --- lib/internal/streams/pipeline.js | 17 +++++------------ test/parallel/test-stream-pipeline.js | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d855c2a08af147..8c6cbf7524ea53 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,27 +27,20 @@ let createReadableStreamAsyncIterator; function destroyer(stream, reading, writing, callback) { callback = once(callback); - - let closed = false; - stream.on('close', () => { - closed = true; - }); + let destroyed = false; if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { - if (err) return callback(err); - closed = true; - callback(); + if (destroyed) return; + destroyed = true; + destroyImpl.destroyer(stream, err); + callback(err); }); - let destroyed = false; return (err) => { - if (closed) return; if (destroyed) return; destroyed = true; - destroyImpl.destroyer(stream, err); - callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b3d4064c6a9783..6bfa1331834968 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -763,7 +763,10 @@ const { promisify } = require('util'); s.emit('data', 'asd'); s.emit('end'); }); - s.close = common.mustCall(); + // 'destroyer' can be called multiple times, + // once from stream wrapper and + // once from iterator wrapper. + s.close = common.mustCallAtLeast(1); let ret = ''; pipeline(s, async function(source) { for await (const chunk of source) { @@ -909,3 +912,13 @@ const { promisify } = require('util'); assert.strictEqual(err.message, 'kaboom'); })); } + +{ + const src = new PassThrough({ autoDestroy: false }); + const dst = new PassThrough({ autoDestroy: false }); + pipeline(src, dst, common.mustCall(() => { + assert.strictEqual(src.destroyed, true); + assert.strictEqual(dst.destroyed, true); + })); + src.end(); +}