Skip to content

Commit

Permalink
stream: eof & pipeline compat
Browse files Browse the repository at this point in the history
Some broken streams might emit 'close' before 'end' or 'finish'.
However, if they have actually been ended or finished, this
should not result in a premature close error.

Fixes: #29699
  • Loading branch information
ronag committed Jan 18, 2020
1 parent ba74fd8 commit fa1239e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 14 deletions.
10 changes: 10 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1525,6 +1525,11 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

`stream.finished()` will error with `ERR_STREAM_PREMATURE_CLOSE` if:
* `Writable` emits `'close'` before `'finish'` and
[`writable.writableFinished`][].
* `Readable` emits `'close'` before `'end'` and [`readable.readableEnded`][].

The `finished` API is promisify-able as well;

```js
Expand Down Expand Up @@ -1647,6 +1652,10 @@ run().catch(console.error);
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.

If any `Writable` or `Readable` stream emits `'close'` without being able to
fully flush or drain, `stream.pipeline()` will error with
`ERR_STREAM_PREMATURE_CLOSE`.

`stream.pipeline()` leaves dangling event listeners on the streams
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.
Expand Down Expand Up @@ -2865,6 +2874,7 @@ contain multi-byte characters.
[`process.stdout`]: process.html#process_process_stdout
[`readable._read()`]: #stream_readable_read_size_1
[`readable.push('')`]: #stream_readable_push
[`readable.readableEnded`]: #stream_readable_readableended
[`readable.setEncoding()`]: #stream_readable_setencoding_encoding
[`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options
[`stream.cork()`]: #stream_writable_cork
Expand Down
22 changes: 18 additions & 4 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -79,10 +79,24 @@ function eos(stream, opts, callback) {
};

const onclose = () => {
if (readable && !readableEnded) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (writable && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (readable) {
const ended = (stream._readableState &&
stream._readableState.endEmitted) || stream.readableEnded;
if (!ended && !readableEnded) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (!readableEnded) {
// Compat. Stream has ended but haven't emitted 'end'.
callback.call(stream);
}
} else if (writable) {
const finished = (stream._writableState &&
stream._writableState.finished) || stream.writableFinished;
if (!finished && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (!writableFinished) {
// Compat. Stream has finished but haven't emitted 'finish'.
callback.call(stream);
}
}
};

Expand Down
23 changes: 23 additions & 0 deletions lib/internal/streams/pipeline.js
Expand Up @@ -46,6 +46,29 @@ function destroyer(stream, reading, writing, callback) {

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
reading &&
(stream._readableState && stream._readableState.ended)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
stream
.on('end', () => {
closed = true;
callback();
})
.on('error', callback);
return;
}

if (err) return callback(err);
closed = true;
callback();
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http-client-finished.js
Expand Up @@ -55,7 +55,7 @@ const { finished } = require('stream');
}).end();
finished(req, (err) => {
common.expectsError({
type: Error,
name: 'Error',
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
Expand Down
17 changes: 8 additions & 9 deletions test/parallel/test-stream-finished.js
Expand Up @@ -273,27 +273,26 @@ const { promisify } = require('util');
}

{
// Premature close if stream never emitted 'finish'
// even if writableFinished says something else.
// No error if stream never emitted 'end'
// even if readableEnded says something else.

const streamLike = new EE();
streamLike.writable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
finished(streamLike, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
streamLike.writableFinished = true;
streamLike.emit('close');
}


{
// Premature close if stream never emitted 'end'
// even if readableEnded says something else.
// No error if stream never emitted 'finished'
// even if writableFinished says something else.

const streamLike = new EE();
streamLike.readable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
finished(streamLike, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
streamLike.readableEnded = true;
streamLike.emit('close');
Expand Down
31 changes: 31 additions & 0 deletions test/parallel/test-stream-pipeline.js
Expand Up @@ -912,4 +912,35 @@ const { promisify } = require('util');
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const w = new Writable({
write(chunk, encoding, cb) {
cb();
}
});
pipeline(r, w, (err) => {
assert.strictEqual(err, undefined);
});
r.push(null);
r.destroy();
}

{
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const w = new Writable({
write(chunk, encoding, cb) {
cb();
}
});
pipeline(r, w, (err) => {
assert.strictEqual(err, undefined);
});
r.push(null);
r.emit('close');
}

0 comments on commit fa1239e

Please sign in to comment.