Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[x] stream: eof & pipeline compat #29724

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1525,6 +1525,11 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

`stream.finished()` will error with `ERR_STREAM_PREMATURE_CLOSE` if:
* `Writable` emits `'close'` before `'finish'` and
[`writable.writableFinished`][].
* `Readable` emits `'close'` before `'end'` and [`readable.readableEnded`][].

The `finished` API is promisify-able as well;

```js
Expand Down Expand Up @@ -1647,6 +1652,10 @@ run().catch(console.error);
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.

If any `Writable` or `Readable` stream emits `'close'` without being able to
fully flush or drain, `stream.pipeline()` will error with
`ERR_STREAM_PREMATURE_CLOSE`.
mcollina marked this conversation as resolved.
Show resolved Hide resolved

`stream.pipeline()` leaves dangling event listeners on the streams
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.
Expand Down Expand Up @@ -2865,6 +2874,7 @@ contain multi-byte characters.
[`process.stdout`]: process.html#process_process_stdout
[`readable._read()`]: #stream_readable_read_size_1
[`readable.push('')`]: #stream_readable_push
[`readable.readableEnded`]: #stream_readable_readableended
[`readable.setEncoding()`]: #stream_readable_setencoding_encoding
[`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options
[`stream.cork()`]: #stream_writable_cork
Expand Down
2 changes: 2 additions & 0 deletions lib/_stream_readable.js
Expand Up @@ -147,6 +147,8 @@ function ReadableState(options, stream, isDuplex) {
// Indicates whether the stream has errored.
this.errored = false;

this.closed = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand Down
2 changes: 2 additions & 0 deletions lib/_stream_writable.js
Expand Up @@ -175,6 +175,8 @@ function WritableState(options, stream, isDuplex) {
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

this.closed = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down
10 changes: 0 additions & 10 deletions lib/internal/streams/async_iterator.js
Expand Up @@ -122,16 +122,6 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/destroy.js
Expand Up @@ -73,6 +73,13 @@ function emitCloseNT(self) {
const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if ((w && w.emitClose) || (r && r.emitClose)) {
self.emit('close');
}
Expand Down Expand Up @@ -103,6 +110,7 @@ function undestroy() {
if (r) {
r.destroyed = false;
r.errored = false;
r.closed = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
Expand All @@ -112,6 +120,7 @@ function undestroy() {
if (w) {
w.destroyed = false;
w.errored = false;
w.closed = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand Down
89 changes: 65 additions & 24 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -13,6 +13,18 @@ function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand All @@ -28,43 +40,72 @@ function eos(stream, opts, callback) {

callback = once(callback);

let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);
const onerror = (err) => {
callback.call(stream, err);
};

const w = stream._writableState;
const r = stream._readableState;

let writableFinished = stream.writableFinished || (w && w.finished);
let readableEnded = stream.readableEnded || (r && r.endEmitted);

const errorEmitted = (w && w.errorEmitted) || (r && r.errorEmitted);
const closed = (w && w.closed) || (r && r.closed);

if (writableFinished || readableEnded || errorEmitted || closed) {
// TODO(ronag): rethrow if errorEmitted?
// TODO(ronag): premature close if closed but not
// errored, finished or ended?

// Swallow any error past this point.
if (opts.error !== false) stream.on('error', onerror);

process.nextTick(callback.bind(stream));

return () => {
stream.removeListener('error', onerror);
};
}

const readable = opts.readable ||
(opts.readable !== false && isReadable(stream));
const writable = opts.writable ||
(opts.writable !== false && isWritable(stream));

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

let writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableEnded = true;
if (!readable) callback.call(stream);
writableFinished = true;
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
if (!writable || writableFinished) callback.call(stream);
};

const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
if (readable) {
const ended = (stream._readableState &&
stream._readableState.endEmitted) || stream.readableEnded;
if (!ended && !readableEnded) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (!readableEnded) {
// Compat. Stream has ended but haven't emitted 'end'.
callback.call(stream);
}
} else if (writable) {
const finished = (stream._writableState &&
stream._writableState.finished) || stream.writableFinished;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ws.finished in not the same as ws.ended is it? the old compat was to ignore premature close if the stream had been ended (ie ws.end() was called)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mafintosh: correct, the workaround for that is that pipeline ignores that premature error in a safe way. See the changes in pipeline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way we get "correct" behaviour from eos while not breaking and still having "correct" behaviour in pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, okay, let me try to grok that a bit.

Thinking out loud it would be really nice to check if a stream is autoDestroy'ing (which I think at some point should be the default) and then simply wait for close always as the terminating event. Should be much simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we could keep the compat mode around for "old" streams but autoDestroying stream would run down the simple path.

Copy link
Member Author

@ronag ronag Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this. Did you want something changed in this PR and if so can you describe it further, or was this just thinking out loud for future changes?

Copy link
Member Author

@ronag ronag Dec 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mafintosh Any further "groking"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that is a stream has autoDestroy set to true, which is probably gonna be the default behaviour at some some, we just need to check if close has been emitted so all of this becomes much easier.

I do think that's worth adding relatively soon.

if (!finished && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (!writableFinished) {
// Compat. Stream has finished but haven't emitted 'finish'.
callback.call(stream);
}
}
};

Expand Down
23 changes: 23 additions & 0 deletions lib/internal/streams/pipeline.js
Expand Up @@ -46,6 +46,29 @@ function destroyer(stream, reading, writing, callback) {

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
reading &&
(stream._readableState && stream._readableState.ended)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
stream
.on('end', () => {
closed = true;
callback();
})
.on('error', callback);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if this logic lived in end-of-stream instead.

Copy link
Member Author

@ronag ronag Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I can't think of a way to do it in end-of-stream while both keeping compat and "correct" behaviour at the same time. The current proposal I believe is achieving best of both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mafintosh: See here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I completely follow this still


if (err) return callback(err);
closed = true;
callback();
Expand Down
106 changes: 106 additions & 0 deletions test/parallel/test-http-client-finished.js
Expand Up @@ -25,3 +25,109 @@ const { finished } = require('stream');
.end();
}));
}

{
// Test abort before finished.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}, common.mustNotCall());
req.abort();
finished(req, common.mustCall(() => {
server.close();
}));
}));
}

{
// Test abort after request.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).end();
finished(req, (err) => {
common.expectsError({
name: 'Error',
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
server.close();
}));
});
req.abort();
}));
}

{
// Test abort before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
req.abort();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test destroy before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
res.destroy();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test finish after end.

const server = http.createServer(function(req, res) {
res.end('asd');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}