@@ -1886,16 +1886,14 @@ const { pipeline } = require('stream/promises');
1886
1886
1887
1887
async function run () {
1888
1888
const ac = new AbortController ();
1889
- const options = {
1890
- signal: ac .signal ,
1891
- };
1889
+ const signal = ac .signal ;
1892
1890
1893
1891
setTimeout (() => ac .abort (), 1 );
1894
1892
await pipeline (
1895
1893
fs .createReadStream (' archive.tar' ),
1896
1894
zlib .createGzip (),
1897
1895
fs .createWriteStream (' archive.tar.gz' ),
1898
- options ,
1896
+ { signal } ,
1899
1897
);
1900
1898
}
1901
1899
@@ -1911,10 +1909,10 @@ const fs = require('fs');
1911
1909
async function run () {
1912
1910
await pipeline (
1913
1911
fs .createReadStream (' lowercase.txt' ),
1914
- async function * (source ) {
1912
+ async function * (source , signal ) {
1915
1913
source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
1916
1914
for await (const chunk of source ) {
1917
- yield chunk . toUpperCase ( );
1915
+ yield await processChunk (chunk, { signal } );
1918
1916
}
1919
1917
},
1920
1918
fs .createWriteStream (' uppercase.txt' )
@@ -1925,6 +1923,28 @@ async function run() {
1925
1923
run ().catch (console .error );
1926
1924
```
1927
1925
1926
+ Remember to handle the ` signal ` argument passed into the async generator.
1927
+ Especially in the case where the async generator is the source for the
1928
+ pipeline (i.e. first argument) or the pipeline will never complete.
1929
+
1930
+ ``` js
1931
+ const { pipeline } = require (' stream/promises' );
1932
+ const fs = require (' fs' );
1933
+
1934
+ async function run () {
1935
+ await pipeline (
1936
+ async function * (signal ) {
1937
+ await someLongRunningfn ({ signal });
1938
+ yield ' asd' ;
1939
+ },
1940
+ fs .createWriteStream (' uppercase.txt' )
1941
+ );
1942
+ console .log (' Pipeline succeeded.' );
1943
+ }
1944
+
1945
+ run ().catch (console .error );
1946
+ ```
1947
+
1928
1948
` stream.pipeline() ` will call ` stream.destroy(err) ` on all streams except:
1929
1949
* ` Readable ` streams which have emitted ` 'end' ` or ` 'close' ` .
1930
1950
* ` Writable ` streams which have emitted ` 'finish' ` or ` 'close' ` .
@@ -3342,13 +3362,20 @@ the `Readable.from()` utility method:
3342
3362
``` js
3343
3363
const { Readable } = require (' stream' );
3344
3364
3365
+ const ac = new AbortController ();
3366
+ const signal = ac .signal ;
3367
+
3345
3368
async function * generate () {
3346
3369
yield ' a' ;
3370
+ await someLongRunningFn ({ signal });
3347
3371
yield ' b' ;
3348
3372
yield ' c' ;
3349
3373
}
3350
3374
3351
3375
const readable = Readable .from (generate ());
3376
+ readable .on (' close' , () => {
3377
+ ac .abort ();
3378
+ });
3352
3379
3353
3380
readable .on (' data' , (chunk ) => {
3354
3381
console .log (chunk);
@@ -3368,21 +3395,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
3368
3395
3369
3396
const writable = fs .createWriteStream (' ./file' );
3370
3397
3398
+ const ac = new AbortController ();
3399
+ const signal = ac .signal ;
3400
+
3401
+ const iterator = createIterator ({ signal });
3402
+
3371
3403
// Callback Pattern
3372
3404
pipeline (iterator, writable, (err , value ) => {
3373
3405
if (err) {
3374
3406
console .error (err);
3375
3407
} else {
3376
3408
console .log (value, ' value returned' );
3377
3409
}
3410
+ }).on (' close' , () => {
3411
+ ac .abort ();
3378
3412
});
3379
3413
3380
3414
// Promise Pattern
3381
3415
pipelinePromise (iterator, writable)
3382
3416
.then ((value ) => {
3383
3417
console .log (value, ' value returned' );
3384
3418
})
3385
- .catch (console .error );
3419
+ .catch ((err ) => {
3420
+ console .error (err);
3421
+ ac .abort ();
3422
+ });
3386
3423
```
3387
3424
3388
3425
<!-- type=misc-->
0 commit comments