Skip to content

Commit 97f3072

Browse files
ronagrichardlau
authored andcommittedSep 10, 2021
stream: add signal support to pipeline generators
Generators in pipeline must be able to be aborted or pipeline can deadlock. PR-URL: #39067 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 450da99 commit 97f3072

File tree

7 files changed

+117
-33
lines changed

7 files changed

+117
-33
lines changed
 

‎doc/api/stream.md

+44-7
Original file line numberDiff line numberDiff line change
@@ -1886,16 +1886,14 @@ const { pipeline } = require('stream/promises');
18861886

18871887
async function run() {
18881888
const ac = new AbortController();
1889-
const options = {
1890-
signal: ac.signal,
1891-
};
1889+
const signal = ac.signal;
18921890

18931891
setTimeout(() => ac.abort(), 1);
18941892
await pipeline(
18951893
fs.createReadStream('archive.tar'),
18961894
zlib.createGzip(),
18971895
fs.createWriteStream('archive.tar.gz'),
1898-
options,
1896+
{ signal },
18991897
);
19001898
}
19011899

@@ -1911,10 +1909,10 @@ const fs = require('fs');
19111909
async function run() {
19121910
await pipeline(
19131911
fs.createReadStream('lowercase.txt'),
1914-
async function* (source) {
1912+
async function* (source, signal) {
19151913
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
19161914
for await (const chunk of source) {
1917-
yield chunk.toUpperCase();
1915+
yield await processChunk(chunk, { signal });
19181916
}
19191917
},
19201918
fs.createWriteStream('uppercase.txt')
@@ -1925,6 +1923,28 @@ async function run() {
19251923
run().catch(console.error);
19261924
```
19271925

1926+
Remember to handle the `signal` argument passed into the async generator.
1927+
Especially in the case where the async generator is the source for the
1928+
pipeline (i.e. first argument) or the pipeline will never complete.
1929+
1930+
```js
1931+
const { pipeline } = require('stream/promises');
1932+
const fs = require('fs');
1933+
1934+
async function run() {
1935+
await pipeline(
1936+
async function * (signal) {
1937+
await someLongRunningfn({ signal });
1938+
yield 'asd';
1939+
},
1940+
fs.createWriteStream('uppercase.txt')
1941+
);
1942+
console.log('Pipeline succeeded.');
1943+
}
1944+
1945+
run().catch(console.error);
1946+
```
1947+
19281948
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
19291949
* `Readable` streams which have emitted `'end'` or `'close'`.
19301950
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -3342,13 +3362,20 @@ the `Readable.from()` utility method:
33423362
```js
33433363
const { Readable } = require('stream');
33443364

3365+
const ac = new AbortController();
3366+
const signal = ac.signal;
3367+
33453368
async function * generate() {
33463369
yield 'a';
3370+
await someLongRunningFn({ signal });
33473371
yield 'b';
33483372
yield 'c';
33493373
}
33503374

33513375
const readable = Readable.from(generate());
3376+
readable.on('close', () => {
3377+
ac.abort();
3378+
});
33523379

33533380
readable.on('data', (chunk) => {
33543381
console.log(chunk);
@@ -3368,21 +3395,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
33683395

33693396
const writable = fs.createWriteStream('./file');
33703397

3398+
const ac = new AbortController();
3399+
const signal = ac.signal;
3400+
3401+
const iterator = createIterator({ signal });
3402+
33713403
// Callback Pattern
33723404
pipeline(iterator, writable, (err, value) => {
33733405
if (err) {
33743406
console.error(err);
33753407
} else {
33763408
console.log(value, 'value returned');
33773409
}
3410+
}).on('close', () => {
3411+
ac.abort();
33783412
});
33793413

33803414
// Promise Pattern
33813415
pipelinePromise(iterator, writable)
33823416
.then((value) => {
33833417
console.log(value, 'value returned');
33843418
})
3385-
.catch(console.error);
3419+
.catch((err) => {
3420+
console.error(err);
3421+
ac.abort();
3422+
});
33863423
```
33873424

33883425
<!--type=misc-->

‎lib/internal/streams/compose.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const pipeline = require('internal/streams/pipeline');
3+
const { pipeline } = require('internal/streams/pipeline');
44
const Duplex = require('internal/streams/duplex');
55
const { destroyer } = require('internal/streams/destroy');
66
const {

‎lib/internal/streams/duplexify.js

+14-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const from = require('internal/streams/from');
2626
const {
2727
isBlob,
2828
} = require('internal/blob');
29+
const { AbortController } = require('internal/abort_controller');
2930

3031
const {
3132
FunctionPrototypeCall
@@ -81,14 +82,15 @@ module.exports = function duplexify(body, name) {
8182
// }
8283

8384
if (typeof body === 'function') {
84-
const { value, write, final } = fromAsyncGen(body);
85+
const { value, write, final, destroy } = fromAsyncGen(body);
8586

8687
if (isIterable(value)) {
8788
return from(Duplexify, value, {
8889
// TODO (ronag): highWaterMark?
8990
objectMode: true,
9091
write,
91-
final
92+
final,
93+
destroy
9294
});
9395
}
9496

@@ -123,7 +125,8 @@ module.exports = function duplexify(body, name) {
123125
process.nextTick(cb, err);
124126
}
125127
});
126-
}
128+
},
129+
destroy
127130
});
128131
}
129132

@@ -202,15 +205,18 @@ module.exports = function duplexify(body, name) {
202205

203206
function fromAsyncGen(fn) {
204207
let { promise, resolve } = createDeferredPromise();
208+
const ac = new AbortController();
209+
const signal = ac.signal;
205210
const value = fn(async function*() {
206211
while (true) {
207212
const { chunk, done, cb } = await promise;
208213
process.nextTick(cb);
209214
if (done) return;
215+
if (signal.aborted) throw new AbortError();
210216
yield chunk;
211217
({ promise, resolve } = createDeferredPromise());
212218
}
213-
}());
219+
}(), { signal });
214220

215221
return {
216222
value,
@@ -219,6 +225,10 @@ function fromAsyncGen(fn) {
219225
},
220226
final(cb) {
221227
resolve({ done: true, cb });
228+
},
229+
destroy(err, cb) {
230+
ac.abort();
231+
cb(err);
222232
}
223233
};
224234
}

‎lib/internal/streams/pipeline.js

+35-5
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@ const {
2121
ERR_MISSING_ARGS,
2222
ERR_STREAM_DESTROYED,
2323
},
24+
AbortError,
2425
} = require('internal/errors');
2526

26-
const { validateCallback } = require('internal/validators');
27+
const {
28+
validateCallback,
29+
validateAbortSignal
30+
} = require('internal/validators');
2731

2832
const {
2933
isIterable,
3034
isReadableNodeStream,
3135
isNodeStream,
3236
} = require('internal/streams/utils');
37+
const { AbortController } = require('internal/abort_controller');
3338

3439
let PassThrough;
3540
let Readable;
@@ -168,19 +173,37 @@ function pipeline(...streams) {
168173
streams = streams[0];
169174
}
170175

176+
return pipelineImpl(streams, callback);
177+
}
178+
179+
function pipelineImpl(streams, callback, opts) {
171180
if (streams.length < 2) {
172181
throw new ERR_MISSING_ARGS('streams');
173182
}
174183

184+
const ac = new AbortController();
185+
const signal = ac.signal;
186+
const outerSignal = opts?.signal;
187+
188+
validateAbortSignal(outerSignal, 'options.signal');
189+
190+
function abort() {
191+
finishImpl(new AbortError());
192+
}
193+
194+
outerSignal?.addEventListener('abort', abort);
195+
175196
let error;
176197
let value;
177198
const destroys = [];
178199

179200
let finishCount = 0;
180201

181202
function finish(err) {
182-
const final = --finishCount === 0;
203+
finishImpl(err, --finishCount === 0);
204+
}
183205

206+
function finishImpl(err, final) {
184207
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
185208
error = err;
186209
}
@@ -193,6 +216,9 @@ function pipeline(...streams) {
193216
destroys.shift()(error);
194217
}
195218

219+
outerSignal?.removeEventListener('abort', abort);
220+
ac.abort();
221+
196222
if (final) {
197223
callback(error, value);
198224
}
@@ -211,7 +237,7 @@ function pipeline(...streams) {
211237

212238
if (i === 0) {
213239
if (typeof stream === 'function') {
214-
ret = stream();
240+
ret = stream({ signal });
215241
if (!isIterable(ret)) {
216242
throw new ERR_INVALID_RETURN_VALUE(
217243
'Iterable, AsyncIterable or Stream', 'source', ret);
@@ -223,7 +249,7 @@ function pipeline(...streams) {
223249
}
224250
} else if (typeof stream === 'function') {
225251
ret = makeAsyncIterable(ret);
226-
ret = stream(ret);
252+
ret = stream(ret, { signal });
227253

228254
if (reading) {
229255
if (!isIterable(ret, true)) {
@@ -291,7 +317,11 @@ function pipeline(...streams) {
291317
}
292318
}
293319

320+
if (signal?.aborted || outerSignal?.aborted) {
321+
process.nextTick(abort);
322+
}
323+
294324
return ret;
295325
}
296326

297-
module.exports = pipeline;
327+
module.exports = { pipelineImpl, pipeline };

‎lib/stream.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ const {
2929
promisify: { custom: customPromisify },
3030
} = require('internal/util');
3131

32-
const pipeline = require('internal/streams/pipeline');
3332
const compose = require('internal/streams/compose');
33+
const { pipeline } = require('internal/streams/pipeline');
3434
const { destroyer } = require('internal/streams/destroy');
3535
const eos = require('internal/streams/end-of-stream');
3636
const internalBuffer = require('internal/buffer');

‎lib/stream/promises.js

+3-15
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,12 @@ const {
55
Promise,
66
} = primordials;
77

8-
const {
9-
addAbortSignalNoValidate,
10-
} = require('internal/streams/add-abort-signal');
11-
12-
const {
13-
validateAbortSignal,
14-
} = require('internal/validators');
15-
168
const {
179
isIterable,
1810
isNodeStream,
1911
} = require('internal/streams/utils');
2012

21-
const pl = require('internal/streams/pipeline');
13+
const { pipelineImpl: pl } = require('internal/streams/pipeline');
2214
const eos = require('internal/streams/end-of-stream');
2315

2416
function pipeline(...streams) {
@@ -29,19 +21,15 @@ function pipeline(...streams) {
2921
!isNodeStream(lastArg) && !isIterable(lastArg)) {
3022
const options = ArrayPrototypePop(streams);
3123
signal = options.signal;
32-
validateAbortSignal(signal, 'options.signal');
3324
}
3425

35-
const pipe = pl(...streams, (err, value) => {
26+
pl(streams, (err, value) => {
3627
if (err) {
3728
reject(err);
3829
} else {
3930
resolve(value);
4031
}
41-
});
42-
if (signal) {
43-
addAbortSignalNoValidate(signal, pipe);
44-
}
32+
}, { signal });
4533
});
4634
}
4735

‎test/parallel/test-stream-pipeline.js

+19
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ const {
1111
Duplex,
1212
addAbortSignal,
1313
} = require('stream');
14+
const pipelinep = require('stream/promises').pipeline;
1415
const assert = require('assert');
1516
const http = require('http');
1617
const { promisify } = require('util');
1718
const net = require('net');
19+
const tsp = require('timers/promises');
1820

1921
{
2022
let finished = false;
@@ -1387,3 +1389,20 @@ const net = require('net');
13871389
assert.strictEqual(res, content);
13881390
}));
13891391
}
1392+
1393+
{
1394+
const ac = new AbortController();
1395+
const signal = ac.signal;
1396+
pipelinep(
1397+
async function * ({ signal }) {
1398+
await tsp.setTimeout(1e6, signal);
1399+
},
1400+
async function(source) {
1401+
1402+
},
1403+
{ signal }
1404+
).catch(common.mustCall((err) => {
1405+
assert.strictEqual(err.name, 'AbortError');
1406+
}));
1407+
ac.abort();
1408+
}

0 commit comments

Comments
 (0)
Please sign in to comment.