Skip to content

Commit

Permalink
doc: piping from async generators using pipeline()
Browse files Browse the repository at this point in the history
PR-URL: #33992
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
WilliamConnatser authored and mcollina committed Jun 22, 2020
1 parent 7b019fd commit bfc0e3f
Showing 1 changed file with 20 additions and 67 deletions.
87 changes: 20 additions & 67 deletions doc/api/stream.md
Expand Up @@ -2886,80 +2886,33 @@ readable.on('data', (chunk) => {

#### Piping to writable streams from async iterators

In the scenario of writing to a writable stream from an async iterator, ensure
the correct handling of backpressure and errors.
When writing to a writable stream from an async iterator, ensure correct
handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:

```js
const { once } = require('events');
const finished = util.promisify(stream.finished);
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');

const writable = fs.createWriteStream('./file');

function drain(writable) {
if (writable.destroyed) {
return Promise.reject(new Error('premature close'));
}
return Promise.race([
once(writable, 'drain'),
once(writable, 'close')
.then(() => Promise.reject(new Error('premature close')))
]);
}

async function pump(iterable, writable) {
for await (const chunk of iterable) {
// Handle backpressure on write().
if (!writable.write(chunk)) {
await drain(writable);
}
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
writable.end();
}

(async function() {
// Ensure completion without errors.
await Promise.all([
pump(iterable, writable),
finished(writable)
]);
})();
```

In the above, errors on `write()` would be caught and thrown by the
`once()` listener for the `'drain'` event, since `once()` will also handle the
`'error'` event. To ensure completion of the write stream without errors,
it is safer to use the `finished()` method as above, instead of using the
`once()` listener for the `'finish'` event. Under certain cases, an `'error'`
event could be emitted by the writable stream after `'finish'` and as `once()`
will release the `'error'` handler on handling the `'finish'` event, it could
result in an unhandled error.

Alternatively, the readable stream could be wrapped with `Readable.from()` and
then piped via `.pipe()`:

```js
const finished = util.promisify(stream.finished);

const writable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterable);
readable.pipe(writable);
// Ensure completion without errors.
await finished(writable);
})();
```

Or, using `stream.pipeline()` to pipe streams:

```js
const pipeline = util.promisify(stream.pipeline);

const writable = fs.createWriteStream('./file');
});

(async function() {
await pipeline(iterable, writable);
})();
// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch(console.error);
```

<!--type=misc-->
Expand Down

0 comments on commit bfc0e3f

Please sign in to comment.