diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 012d99de0357f2..8dc4e5792c47d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) { } function pipeline(...streams) { - const callback = once(popCallback(streams)); + return pipelineImpl(streams, once(popCallback(streams))); +} - // stream.pipeline(streams, callback) - if (ArrayIsArray(streams[0]) && streams.length === 1) { +function pipelineImpl(streams, callback, opts) { + if (streams.length === 1 && ArrayIsArray(streams[0])) { streams = streams[0]; } - return pipelineImpl(streams, callback); -} - -function pipelineImpl(streams, callback, opts) { if (streams.length < 2) { throw new ERR_MISSING_ARGS('streams'); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b21e1ce52b3cb3..067a7e65a4c67d 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1406,3 +1406,44 @@ const tsp = require('timers/promises'); })); ac.abort(); } + +{ + async function run() { + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + await pipelinep([Readable.from('Hello World!'), write]); + assert(finished); + assert.strictEqual(text, 'Hello World!'); + } + + run(); +} + +{ + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => { + assert(finished); + assert.strictEqual(text, 'Hello World!'); + })); +}