Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: pipeline with signal #39067

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 44 additions & 7 deletions doc/api/stream.md
Expand Up @@ -1870,16 +1870,14 @@ const { pipeline } = require('stream/promises');

async function run() {
const ac = new AbortController();
const options = {
signal: ac.signal,
};
const signal = ac.signal;

setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
options,
{ signal },
);
}

Expand All @@ -1895,10 +1893,10 @@ const fs = require('fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
async function* (source, signal) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield chunk.toUpperCase();
yield await processChunk(chunk, { signal });
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
},
fs.createWriteStream('uppercase.txt')
Expand All @@ -1909,6 +1907,28 @@ async function run() {
run().catch(console.error);
```

Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.

```js
const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
await pipeline(
async function * (signal) {
await someLongRunningfn({ signal });
ronag marked this conversation as resolved.
Show resolved Hide resolved
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
Expand Down Expand Up @@ -3407,13 +3427,20 @@ the `Readable.from()` utility method:
```js
const { Readable } = require('stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});

readable.on('data', (chunk) => {
console.log(chunk);
Expand All @@ -3433,21 +3460,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch(console.error);
.catch((err) => {
console.error(err);
ac.abort();
});
```

<!--type=misc-->
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/compose.js
@@ -1,6 +1,6 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const { pipeline } = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
Expand Down
18 changes: 14 additions & 4 deletions lib/internal/streams/duplexify.js
Expand Up @@ -26,6 +26,7 @@ const from = require('internal/streams/from');
const {
isBlob,
} = require('internal/blob');
const { AbortController } = require('internal/abort_controller');

const {
FunctionPrototypeCall
Expand Down Expand Up @@ -81,14 +82,15 @@ module.exports = function duplexify(body, name) {
// }

if (typeof body === 'function') {
const { value, write, final } = fromAsyncGen(body);
const { value, write, final, destroy } = fromAsyncGen(body);

if (isIterable(value)) {
return from(Duplexify, value, {
// TODO (ronag): highWaterMark?
objectMode: true,
write,
final
final,
destroy
});
}

Expand Down Expand Up @@ -123,7 +125,8 @@ module.exports = function duplexify(body, name) {
process.nextTick(cb, err);
}
});
}
},
destroy
});
}

Expand Down Expand Up @@ -202,15 +205,18 @@ module.exports = function duplexify(body, name) {

function fromAsyncGen(fn) {
let { promise, resolve } = createDeferredPromise();
const ac = new AbortController();
const signal = ac.signal;
const value = fn(async function*() {
while (true) {
const { chunk, done, cb } = await promise;
process.nextTick(cb);
if (done) return;
if (signal.aborted) throw new AbortError();
yield chunk;
({ promise, resolve } = createDeferredPromise());
}
}());
}(), { signal });

return {
value,
Expand All @@ -219,6 +225,10 @@ function fromAsyncGen(fn) {
},
final(cb) {
resolve({ done: true, cb });
},
destroy(err, cb) {
ac.abort();
cb(err);
}
};
}
Expand Down
40 changes: 35 additions & 5 deletions lib/internal/streams/pipeline.js
Expand Up @@ -21,15 +21,20 @@ const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
},
AbortError,
} = require('internal/errors');

const { validateCallback } = require('internal/validators');
const {
validateCallback,
validateAbortSignal
} = require('internal/validators');

const {
isIterable,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;
Expand Down Expand Up @@ -168,19 +173,37 @@ function pipeline(...streams) {
streams = streams[0];
}

return pipelineImpl(streams, callback);
}

function pipelineImpl(streams, callback, opts) {
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

const ac = new AbortController();
const signal = ac.signal;
const outerSignal = opts?.signal;

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
finishImpl(new AbortError());
}

outerSignal?.addEventListener('abort', abort);

let error;
let value;
const destroys = [];

let finishCount = 0;

function finish(err) {
const final = --finishCount === 0;
finishImpl(err, --finishCount === 0);
}

function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
}
Expand All @@ -193,6 +216,9 @@ function pipeline(...streams) {
destroys.shift()(error);
}

outerSignal?.removeEventListener('abort', abort);
ac.abort();

if (final) {
callback(error, value);
}
Expand All @@ -211,7 +237,7 @@ function pipeline(...streams) {

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
ret = stream({ signal });
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
Expand All @@ -223,7 +249,7 @@ function pipeline(...streams) {
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret);
ret = stream(ret, { signal });

if (reading) {
if (!isIterable(ret, true)) {
Expand Down Expand Up @@ -291,7 +317,11 @@ function pipeline(...streams) {
}
}

if (signal?.aborted || outerSignal?.aborted) {
process.nextTick(abort);
}

return ret;
}

module.exports = pipeline;
module.exports = { pipelineImpl, pipeline };
2 changes: 1 addition & 1 deletion lib/stream.js
Expand Up @@ -29,8 +29,8 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
Expand Down
18 changes: 3 additions & 15 deletions lib/stream/promises.js
Expand Up @@ -5,20 +5,12 @@ const {
Promise,
} = primordials;

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
validateAbortSignal,
} = require('internal/validators');

const {
isIterable,
isNodeStream,
} = require('internal/streams/utils');

const pl = require('internal/streams/pipeline');
const { pipelineImpl: pl } = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

function pipeline(...streams) {
Expand All @@ -29,19 +21,15 @@ function pipeline(...streams) {
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
}

const pipe = pl(...streams, (err, value) => {
pl(streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
if (signal) {
addAbortSignalNoValidate(signal, pipe);
}
}, { signal });
});
}

Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-pipeline.js
Expand Up @@ -11,10 +11,12 @@ const {
Duplex,
addAbortSignal,
} = require('stream');
const pipelinep = require('stream/promises').pipeline;
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');

{
let finished = false;
Expand Down Expand Up @@ -1387,3 +1389,20 @@ const net = require('net');
assert.strictEqual(res, content);
}));
}

{
const ac = new AbortController();
const signal = ac.signal;
pipelinep(
async function * ({ signal }) {
await tsp.setTimeout(1e6, signal);
},
async function(source) {

},
{ signal }
).catch(common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
ac.abort();
}