Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: allow readable to end early without error #40881

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 64 additions & 55 deletions lib/internal/streams/pipeline.js
Expand Up @@ -33,52 +33,26 @@ const {
isIterable,
isReadableNodeStream,
isNodeStream,
isReadableFinished,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

function destroyer(stream, reading, writing) {
let finished = false;
stream.on('close', () => {
finished = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;

const rState = stream._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
reading &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
stream
.once('end', callback)
.once('error', callback);
} else {
callback(err);
}
});

return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
};
}

Expand Down Expand Up @@ -109,7 +83,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish, opts) {
async function pump(iterable, writable, finish, { end }) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) {
}
}

if (opts?.end !== false) {
if (end) {
writable.end();
}

Expand Down Expand Up @@ -220,7 +194,7 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
callback(error, value);
process.nextTick(callback, error, value);
}
}

Expand All @@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) {

if (isNodeStream(stream)) {
if (end) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
} else {
stream.on('error', finish);
destroys.push(destroyer(stream, reading, writing));
}

// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
if (
err &&
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
}
});
}

if (i === 0) {
Expand Down Expand Up @@ -286,15 +261,18 @@ function pipelineImpl(streams, callback, opts) {
// second use.
const then = ret?.then;
if (typeof then === 'function') {
finishCount++;
then.call(ret,
(val) => {
value = val;
pt.write(val);
if (end) {
pt.end();
}
process.nextTick(finish);
}, (err) => {
pt.destroy(err);
process.nextTick(finish, err);
},
);
} else if (isIterable(ret, true)) {
Expand All @@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, finish));
destroys.push(destroyer(ret, false, true));
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
if (stream === process.stdout || stream === process.stderr) {
ret.on('end', () => stream.end());
}
} else {
ret = makeAsyncIterable(ret);

finishCount += 2;
pipe(ret, stream, finish, { end });
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
}
ret = stream;
} else {
Expand All @@ -339,4 +311,41 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, finish, { end }) {
src.pipe(dst, { end });

if (end) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => dst.end());
} else {
finish();
}

eos(src, { readable: true, writable: false }, (err) => {
const rState = src._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
src
.once('end', finish)
.once('error', finish);
} else {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
}

module.exports = { pipelineImpl, pipeline };
28 changes: 26 additions & 2 deletions test/parallel/test-stream-pipeline.js
Expand Up @@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustSucceed(() => {
assert.strictEqual(dst.destroyed, true);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}
Expand Down Expand Up @@ -1462,7 +1462,7 @@ const tsp = require('timers/promises');

await pipelinePromise(read, duplex);

assert.strictEqual(duplex.destroyed, true);
assert.strictEqual(duplex.destroyed, false);
}

run().then(common.mustCall());
Expand All @@ -1488,3 +1488,27 @@ const tsp = require('timers/promises');

run().then(common.mustCall());
}

{
const s = new PassThrough({ objectMode: true });
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
yield 'world';
}, s, async function(source) {
let ret = '';
let n = 0;
for await (const chunk of source) {
if (n++ > 1) {
break;
}
ret += chunk;
}
return ret;
}, common.mustCall((err, val) => {
assert.strictEqual(err, undefined);
assert.strictEqual(val, 'helloworld');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems to be broken. val is undefined, I'm not sure why the thrown exception does not make the process exit. I did not investigate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.. that's weird. Fixed the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated test looks good, but why the process did not exit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reproduce the same issue with current Node.js version (v17.2.0) so it is not related to this PR but it is something to investigate and fix.

Copy link
Member

@lpinca lpinca Dec 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the error thrown in the callback is caught and swallowed here

} catch (err) {
finish(error !== err ? aggregateTwoErrors(error, err) : err);
.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL, f0ec29b

assert.strictEqual(s.destroyed, true);
}));
}