From c38fad2e259231eba547b7a1e260b182b1dde9ad Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Jan 2021 21:35:01 +0100 Subject: [PATCH 1/3] stream: add readableDidRead Adds readableDidRead to streams and applies usage to http, http2 and quic. --- lib/_http_incoming.js | 17 ++++++------- lib/_http_server.js | 2 +- lib/internal/streams/readable.js | 14 +++++++++++ test/parallel/test-stream-readable-didRead.js | 24 +++++++++++++++++++ 4 files changed, 46 insertions(+), 11 deletions(-) create mode 100644 test/parallel/test-stream-readable-didRead.js diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 3961b583de9ddc..f3064e41be54aa 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -60,8 +60,6 @@ function IncomingMessage(socket) { Readable.call(this, streamOptions); - this._readableState.readingMore = true; - this.socket = socket; this.httpVersionMajor = null; @@ -88,7 +86,6 @@ function IncomingMessage(socket) { this.statusMessage = null; this.client = socket; - this._consuming = false; // Flag for when we decide that this message cannot possibly be // read by the user, so there's no point continuing to handle it. this._dumped = false; @@ -96,6 +93,12 @@ function IncomingMessage(socket) { ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype); ObjectSetPrototypeOf(IncomingMessage, Readable); +ObjectDefineProperty(IncomingMessage.prototype, '_consuming', { + get() { + return this.readableDidRead; + } +}); + ObjectDefineProperty(IncomingMessage.prototype, 'connection', { get: function() { return this.socket; @@ -159,16 +162,10 @@ IncomingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { // version. // Ref: https://v8.dev/blog/v8-release-89 IncomingMessage.prototype._read = function _read(n) { - if (!this._consuming) { - this._readableState.readingMore = false; - this._consuming = true; - } - // We actually do almost nothing here, because the parserOnBody // function fills up our internal buffer directly. However, we // do need to unpause the underlying socket so that it flows. - if (this.socket.readable) - readStart(this.socket); + readStart(this.socket); }; // It's possible that the socket will be destroyed, and removed from diff --git a/lib/_http_server.js b/lib/_http_server.js index 64cd44c066cf8a..8d9321d28b9919 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -802,7 +802,7 @@ function resOnFinish(req, res, socket, state, server) { // If the user never called req.read(), and didn't pipe() or // .resume() or .on('data'), then we call req._dump() so that the // bytes will be pulled off the wire. - if (!req._consuming && !req._readableState.resumeScheduled) + if (!req.readableDidRead) req._dump(); // Make sure the requestTimeout is cleared before finishing. diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7f6876599cc7fc..9ed171dfced9dd 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -167,6 +167,8 @@ function ReadableState(options, stream, isDuplex) { // If true, a maybeReadMore has been scheduled. this.readingMore = false; + this.didRead = false; + this.decoder = null; this.encoding = null; if (options && options.encoding) { @@ -520,6 +522,8 @@ Readable.prototype.read = function(n) { if (ret !== null) this.emit('data', ret); + state.didRead = true; + return ret; }; @@ -823,7 +827,9 @@ function pipeOnDrain(src, dest) { if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && EE.listenerCount(src, 'data')) { + // TODO(ronag): Call resume() instead? state.flowing = true; + state.didRead = true; flow(src); } }; @@ -971,6 +977,7 @@ Readable.prototype.resume = function() { function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; + state.didRead = true; process.nextTick(resume_, stream, state); } } @@ -1181,6 +1188,13 @@ ObjectDefineProperties(Readable.prototype, { } }, + readableDidRead: { + enumerable: false, + get: function() { + return this._readableState.didRead; + } + }, + readableHighWaterMark: { enumerable: false, get: function() { diff --git a/test/parallel/test-stream-readable-didRead.js b/test/parallel/test-stream-readable-didRead.js new file mode 100644 index 00000000000000..18e2da97e88e94 --- /dev/null +++ b/test/parallel/test-stream-readable-didRead.js @@ -0,0 +1,24 @@ +'use strict'; +require('../common'); +const assert = require('assert'); +const Readable = require('stream').Readable; + +{ + const readable = new Readable({ + read: () => {} + }); + + assert.strictEqual(readable.readableDidRead, false); + readable.read(); + assert.strictEqual(readable.readableDidRead, true); +} + +{ + const readable = new Readable({ + read: () => {} + }); + + assert.strictEqual(readable.readableDidRead, false); + readable.resume(); + assert.strictEqual(readable.readableDidRead, true); +} From c7415627bde602142cbdbe979e9935db5e362a4c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 30 Jun 2021 08:49:21 +0200 Subject: [PATCH 2/3] fixup --- lib/_http_incoming.js | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index f3064e41be54aa..3961b583de9ddc 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -60,6 +60,8 @@ function IncomingMessage(socket) { Readable.call(this, streamOptions); + this._readableState.readingMore = true; + this.socket = socket; this.httpVersionMajor = null; @@ -86,6 +88,7 @@ function IncomingMessage(socket) { this.statusMessage = null; this.client = socket; + this._consuming = false; // Flag for when we decide that this message cannot possibly be // read by the user, so there's no point continuing to handle it. this._dumped = false; @@ -93,12 +96,6 @@ function IncomingMessage(socket) { ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype); ObjectSetPrototypeOf(IncomingMessage, Readable); -ObjectDefineProperty(IncomingMessage.prototype, '_consuming', { - get() { - return this.readableDidRead; - } -}); - ObjectDefineProperty(IncomingMessage.prototype, 'connection', { get: function() { return this.socket; @@ -162,10 +159,16 @@ IncomingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { // version. // Ref: https://v8.dev/blog/v8-release-89 IncomingMessage.prototype._read = function _read(n) { + if (!this._consuming) { + this._readableState.readingMore = false; + this._consuming = true; + } + // We actually do almost nothing here, because the parserOnBody // function fills up our internal buffer directly. However, we // do need to unpause the underlying socket so that it flows. - readStart(this.socket); + if (this.socket.readable) + readStart(this.socket); }; // It's possible that the socket will be destroyed, and removed from From d946047969aa5dd034080c8cc5ac5ab1b90cae8c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 13 Jul 2021 23:53:01 +0200 Subject: [PATCH 3/3] fixup: TODO --- lib/_http_incoming.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 3961b583de9ddc..fa7f314ba80040 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -88,6 +88,7 @@ function IncomingMessage(socket) { this.statusMessage = null; this.client = socket; + // TODO: Deprecate and remove. this._consuming = false; // Flag for when we decide that this message cannot possibly be // read by the user, so there's no point continuing to handle it.