Skip to content

Commit

Permalink
stream: add readableDidRead
Browse files Browse the repository at this point in the history
Adds readableDidRead to streams and applies usage to http, http2 and quic.

PR-URL: nodejs#36820
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag committed Aug 23, 2021
1 parent 7410d41 commit f0201c1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/_http_incoming.js
Expand Up @@ -87,6 +87,7 @@ function IncomingMessage(socket) {
this.statusMessage = null;
this.client = socket;

// TODO: Deprecate and remove.
this._consuming = false;
// Flag for when we decide that this message cannot possibly be
// read by the user, so there's no point continuing to handle it.
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_server.js
Expand Up @@ -804,7 +804,7 @@ function resOnFinish(req, res, socket, state, server) {
// If the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req._dump() so that the
// bytes will be pulled off the wire.
if (!req._consuming && !req._readableState.resumeScheduled)
if (!req.readableDidRead)
req._dump();

// Make sure the requestTimeout is cleared before finishing.
Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/readable.js
Expand Up @@ -527,6 +527,8 @@ Readable.prototype.read = function(n) {
this.emit('data', ret);
}

state.didRead = true;

return ret;
};

Expand Down Expand Up @@ -830,7 +832,9 @@ function pipeOnDrain(src, dest) {

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
// TODO(ronag): Call resume() instead?
state.flowing = true;
state.didRead = true;
flow(src);
}
};
Expand Down Expand Up @@ -978,6 +982,7 @@ Readable.prototype.resume = function() {
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
state.didRead = true;
process.nextTick(resume_, stream, state);
}
}
Expand Down Expand Up @@ -1191,12 +1196,16 @@ ObjectDefineProperties(Readable.prototype, {
readableDidRead: {
enumerable: false,
get: function() {
<<<<<<< HEAD
return (
this._readableState.dataEmitted ||
this._readableState.endEmitted ||
this._readableState.errorEmitted ||
this._readableState.closeEmitted
);
=======
return this._readableState.didRead;
>>>>>>> 8fe7d23c17... stream: add readableDidRead
}
},

Expand Down

0 comments on commit f0201c1

Please sign in to comment.