diff --git a/doc/api/stream.md b/doc/api/stream.md index 06233dad2a9e7f..613a0b7c87a8d6 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using callbacks. The API is accessible via `require('node:stream/promises')` or `require('node:stream').promises`. +### `stream.pipeline(source[, ...transforms], destination[, options])` + +### `stream.pipeline(streams[, options])` + + + +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} +* `source` {Stream|Iterable|AsyncIterable|Function} + * Returns: {Promise|AsyncIterable} +* `...transforms` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Promise|AsyncIterable} +* `destination` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Promise|AsyncIterable} +* `options` {Object} + * `signal` {AbortSignal} + * `end` {boolean} +* Returns: {Promise} Fulfills when the pipeline is complete. + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); +const zlib = require('node:zlib'); + +async function run() { + await pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { createGzip } from 'node:zlib'; + +await pipeline( + createReadStream('archive.tar'), + createGzip(), + createWriteStream('archive.tar.gz'), +); +console.log('Pipeline succeeded.'); +``` + +To use an `AbortSignal`, pass it inside an options object, as the last argument. +When the signal is aborted, `destroy` will be called on the underlying pipeline, +with an `AbortError`. + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); +const zlib = require('node:zlib'); + +async function run() { + const ac = new AbortController(); + const signal = ac.signal; + + setImmediate(() => ac.abort()); + await pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz'), + { signal }, + ); +} + +run().catch(console.error); // AbortError +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { createGzip } from 'node:zlib'; + +const ac = new AbortController(); +const { signal } = ac; +setImmediate(() => ac.abort()); +try { + await pipeline( + createReadStream('archive.tar'), + createGzip(), + createWriteStream('archive.tar.gz'), + { signal }, + ); +} catch (err) { + console.error(err); // AbortError +} +``` + +The `pipeline` API also supports async generators: + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); + +async function run() { + await pipeline( + fs.createReadStream('lowercase.txt'), + async function* (source, { signal }) { + source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. + for await (const chunk of source) { + yield await processChunk(chunk, { signal }); + } + }, + fs.createWriteStream('uppercase.txt'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; + +await pipeline( + createReadStream('lowercase.txt'), + async function* (source, { signal }) { + source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. + for await (const chunk of source) { + yield await processChunk(chunk, { signal }); + } + }, + createWriteStream('uppercase.txt'), +); +console.log('Pipeline succeeded.'); +``` + +Remember to handle the `signal` argument passed into the async generator. +Especially in the case where the async generator is the source for the +pipeline (i.e. first argument) or the pipeline will never complete. + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); + +async function run() { + await pipeline( + async function* ({ signal }) { + await someLongRunningfn({ signal }); + yield 'asd'; + }, + fs.createWriteStream('uppercase.txt'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import fs from 'node:fs'; +await pipeline( + async function* ({ signal }) { + await someLongRunningfn({ signal }); + yield 'asd'; + }, + fs.createWriteStream('uppercase.txt'), +); +console.log('Pipeline succeeded.'); +``` + +The `pipeline` API provides [callback version][stream-pipeline]: + +### `stream.finished(stream[, options])` + + + +* `stream` {Stream} +* `options` {Object} + * `error` {boolean|undefined} + * `readable` {boolean|undefined} + * `writable` {boolean|undefined} + * `signal`: {AbortSignal|undefined} +* Returns: {Promise} Fulfills when the stream is no + longer readable or writable. + +```cjs +const { finished } = require('node:stream/promises'); +const fs = require('node:fs'); + +const rs = fs.createReadStream('archive.tar'); + +async function run() { + await finished(rs); + console.log('Stream is done reading.'); +} + +run().catch(console.error); +rs.resume(); // Drain the stream. +``` + +```mjs +import { finished } from 'node:stream/promises'; +import { createReadStream } from 'node:fs'; + +const rs = createReadStream('archive.tar'); + +async function run() { + await finished(rs); + console.log('Stream is done reading.'); +} + +run().catch(console.error); +rs.resume(); // Drain the stream. +``` + +The `finished` API provides [callback version][stream-finished]: + ### Object mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` @@ -2425,22 +2646,7 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. -The `finished` API provides promise version: - -```js -const { finished } = require('node:stream/promises'); -const fs = require('node:fs'); - -const rs = fs.createReadStream('archive.tar'); - -async function run() { - await finished(rs); - console.log('Stream is done reading.'); -} - -run().catch(console.error); -rs.resume(); // Drain the stream. -``` +The `finished` API provides [promise version][stream-finished-promise]. `stream.finished()` leaves dangling event listeners (in particular `'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been @@ -2520,97 +2726,7 @@ pipeline( ); ``` -The `pipeline` API provides a promise version, which can also -receive an options argument as the last parameter with a -`signal` {AbortSignal} property. When the signal is aborted, -`destroy` will be called on the underlying pipeline, with an -`AbortError`. - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); -const zlib = require('node:zlib'); - -async function run() { - await pipeline( - fs.createReadStream('archive.tar'), - zlib.createGzip(), - fs.createWriteStream('archive.tar.gz'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` - -To use an `AbortSignal`, pass it inside an options object, -as the last argument: - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); -const zlib = require('node:zlib'); - -async function run() { - const ac = new AbortController(); - const signal = ac.signal; - - setTimeout(() => ac.abort(), 1); - await pipeline( - fs.createReadStream('archive.tar'), - zlib.createGzip(), - fs.createWriteStream('archive.tar.gz'), - { signal }, - ); -} - -run().catch(console.error); // AbortError -``` - -The `pipeline` API also supports async generators: - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); - -async function run() { - await pipeline( - fs.createReadStream('lowercase.txt'), - async function* (source, { signal }) { - source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. - for await (const chunk of source) { - yield await processChunk(chunk, { signal }); - } - }, - fs.createWriteStream('uppercase.txt'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` - -Remember to handle the `signal` argument passed into the async generator. -Especially in the case where the async generator is the source for the -pipeline (i.e. first argument) or the pipeline will never complete. - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); - -async function run() { - await pipeline( - async function* ({ signal }) { - await someLongRunningfn({ signal }); - yield 'asd'; - }, - fs.createWriteStream('uppercase.txt'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` +The `pipeline` API provides a [promise version][stream-pipeline-promise]. `stream.pipeline()` will call `stream.destroy(err)` on all streams except: @@ -4544,7 +4660,11 @@ contain multi-byte characters. [stream-_write]: #writable_writechunk-encoding-callback [stream-_writev]: #writable_writevchunks-callback [stream-end]: #writableendchunk-encoding-callback +[stream-finished]: #streamfinishedstream-options-callback +[stream-finished-promise]: #streamfinishedstream-options [stream-pause]: #readablepause +[stream-pipeline]: #streampipelinesource-transforms-destination-callback +[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options [stream-push]: #readablepushchunk-encoding [stream-read]: #readablereadsize [stream-resume]: #readableresume