Skip to content

Commit

Permalink
streams: add signal support to pipeline generators
Browse files Browse the repository at this point in the history
Generators in pipeline must be able to be aborted or pipeline
can deadlock.
  • Loading branch information
ronag committed Jun 18, 2021
1 parent 0536be2 commit 01d0351
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 31 deletions.
51 changes: 44 additions & 7 deletions doc/api/stream.md
Expand Up @@ -1812,16 +1812,14 @@ const { pipeline } = require('stream/promises');

async function run() {
const ac = new AbortController();
const options = {
signal: ac.signal,
};
const signal = ac.signal;

setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
options,
{ signal },
);
}

Expand All @@ -1837,10 +1835,10 @@ const fs = require('fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
async function* (source, signal) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield chunk.toUpperCase();
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
Expand All @@ -1851,6 +1849,28 @@ async function run() {
run().catch(console.error);
```

Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.

```js
const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
await pipeline(
async function * (signal) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
Expand Down Expand Up @@ -3139,13 +3159,20 @@ the `Readable.from()` utility method:
```js
const { Readable } = require('stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});

readable.on('data', (chunk) => {
console.log(chunk);
Expand All @@ -3165,21 +3192,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch(console.error);
.catch((err) => {
console.error(err);
ac.abort();
});
```

<!--type=misc-->
Expand Down
43 changes: 35 additions & 8 deletions lib/internal/streams/pipeline.js
Expand Up @@ -22,9 +22,13 @@ const {
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
},
AbortError,
} = require('internal/errors');

const { validateCallback } = require('internal/validators');
const {
validateCallback,
validateAbortSignal
} = require('internal/validators');

function noop() {}

Expand All @@ -34,6 +38,7 @@ const {
isStream,
} = require('internal/streams/utils');
const assert = require('internal/assert');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;
Expand Down Expand Up @@ -176,19 +181,37 @@ function pipeline(...streams) {
streams = streams[0];
}

return pipelineImpl(streams, callback);
}

function pipelineImpl(streams, callback, opts) {
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

const ac = new AbortController();
const signal = ac.signal;
const outerSignal = opts?.signal;

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
finishImpl(new AbortError());
}

outerSignal?.addEventListener('abort', abort);

let error;
let value;
const destroys = [];

let finishCount = 0;

function finish(err) {
const final = --finishCount === 0;
finishImpl(err, --finishCount === 0);
}

function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
}
Expand All @@ -201,6 +224,9 @@ function pipeline(...streams) {
destroys.shift()(error);
}

outerSignal?.removeEventListener('abort', abort);
ac.abort();

if (final) {
callback(error, value);
}
Expand All @@ -219,7 +245,7 @@ function pipeline(...streams) {

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
ret = stream(signal);
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
Expand All @@ -233,7 +259,7 @@ function pipeline(...streams) {
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret);
ret = stream(ret, signal);

if (reading) {
if (!isIterable(ret, true)) {
Expand Down Expand Up @@ -303,10 +329,11 @@ function pipeline(...streams) {
}
}

// TODO(ronag): Consider returning a Duplex proxy if the first argument
// is a writable. Would improve composability.
// See, https://github.com/nodejs/node/issues/32020
if (signal?.aborted || outerSignal?.aborted) {
process.nextTick(abort);
}

return ret;
}

module.exports = pipeline;
module.exports = { pipelineImpl, pipeline };
2 changes: 1 addition & 1 deletion lib/stream.js
Expand Up @@ -29,7 +29,7 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
Expand Down
18 changes: 3 additions & 15 deletions lib/stream/promises.js
Expand Up @@ -5,20 +5,12 @@ const {
Promise,
} = primordials;

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
validateAbortSignal,
} = require('internal/validators');

const {
isIterable,
isStream,
} = require('internal/streams/utils');

const pl = require('internal/streams/pipeline');
const { pipelineImpl: pl } = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

function pipeline(...streams) {
Expand All @@ -29,19 +21,15 @@ function pipeline(...streams) {
!isStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
}

const pipe = pl(...streams, (err, value) => {
pl(streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
if (signal) {
addAbortSignalNoValidate(signal, pipe);
}
}, { signal });
});
}

Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-pipeline.js
Expand Up @@ -11,10 +11,12 @@ const {
Duplex,
addAbortSignal,
} = require('stream');
const pipelinep = require('stream/promises').pipeline;
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');

{
let finished = false;
Expand Down Expand Up @@ -1420,3 +1422,20 @@ const net = require('net');

writableLike.emit('close');
}

{
const ac = new AbortController();
const signal = ac.signal;
pipelinep(
async function * (signal) {
await tsp.setTimeout(1e6, signal);
},
async function(source) {

},
{ signal }
).catch(common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
ac.abort();
}

0 comments on commit 01d0351

Please sign in to comment.