From bbdf58e68ca13049d0ac91b7d9b7d759d0615e98 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Jan 2021 21:35:01 +0100 Subject: [PATCH] stream: add readableDidRead Adds readableDidRead to streams and applies usage to http, http2 and quic. --- lib/_http_incoming.js | 17 +++++++---------- lib/internal/http2/core.js | 13 ++----------- lib/internal/quic/core.js | 8 -------- lib/internal/streams/readable.js | 15 ++++++++++++++- test/parallel/test-stream-readable-data.js | 6 +++++- 5 files changed, 28 insertions(+), 31 deletions(-) diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 6a870a33697a1f..8c73d03b429748 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -61,8 +61,6 @@ function IncomingMessage(socket) { FunctionPrototypeCall(Readable, this, streamOptions); - this._readableState.readingMore = true; - this.socket = socket; this.httpVersionMajor = null; @@ -89,7 +87,6 @@ function IncomingMessage(socket) { this.statusMessage = null; this.client = socket; - this._consuming = false; // Flag for when we decide that this message cannot possibly be // read by the user, so there's no point continuing to handle it. this._dumped = false; @@ -97,6 +94,12 @@ function IncomingMessage(socket) { ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype); ObjectSetPrototypeOf(IncomingMessage, Readable); +ObjectDefineProperty(IncomingMessage.prototype, '_consuming', { + get() { + return this.readableDidRead; + } +}); + ObjectDefineProperty(IncomingMessage.prototype, 'connection', { get: function() { return this.socket; @@ -153,16 +156,10 @@ IncomingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { IncomingMessage.prototype._read = function _read(n) { - if (!this._consuming) { - this._readableState.readingMore = false; - this._consuming = true; - } - // We actually do almost nothing here, because the parserOnBody // function fills up our internal buffer directly. However, we // do need to unpause the underlying socket so that it flows. - if (this.socket.readable) - readStart(this.socket); + readStart(this.socket); }; // It's possible that the socket will be destroyed, and removed from diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 22ed8086dc9316..19e522bef2e604 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -546,7 +546,7 @@ function onStreamClose(code) { // session) then just dump the incoming data so that the stream can // be destroyed. if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER && - !stream[kState].didRead && + !stream.readableDidRead && stream.readableFlowing === null) stream.resume(); else @@ -1889,14 +1889,9 @@ class Http2Stream extends Duplex { this[kSession] = session; session[kState].pendingStreams.add(this); - // Allow our logic for determining whether any reads have happened to - // work in all situations. This is similar to what we do in _http_incoming. - this._readableState.readingMore = true; - this[kTimeout] = null; this[kState] = { - didRead: false, flags: STREAM_FLAGS_PENDING, rstCode: NGHTTP2_NO_ERROR, writeQueueSize: 0, @@ -2132,10 +2127,6 @@ class Http2Stream extends Duplex { this.push(null); return; } - if (!this[kState].didRead) { - this._readableState.readingMore = false; - this[kState].didRead = true; - } if (!this.pending) { FunctionPrototypeCall(streamOnResume, this); } else { @@ -2298,7 +2289,7 @@ class Http2Stream extends Duplex { this[kSession] && this[kSession][kType] === NGHTTP2_SESSION_SERVER && !(state.flags & STREAM_FLAGS_HAS_TRAILERS) && - !state.didRead && + !this.readableDidRead && this.readableFlowing === null) { // By using setImmediate we allow pushStreams to make it through // before the stream is officially closed. This prevents a bug diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 0c326872a7a2ea..556526569b066a 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -2522,7 +2522,6 @@ class QuicStream extends Duplex { closePromiseReject: undefined, closePromiseResolve: undefined, defaultEncoding: undefined, - didRead: false, id: undefined, highWaterMark: undefined, push_id: undefined, @@ -2560,7 +2559,6 @@ class QuicStream extends Duplex { state.defaultEncoding = defaultEncoding; state.session = session; state.push_id = push_id; - this._readableState.readingMore = true; this.on('pause', streamOnPause); if (handle !== undefined) @@ -2846,12 +2844,6 @@ class QuicStream extends Duplex { this.push(null); return; } - const state = this[kInternalState]; - if (!state.didRead) { - this._readableState.readingMore = false; - state.didRead = true; - } - FunctionPrototypeCall(streamOnResume, this); } diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 6fff20c94ec7f3..9a8b8013dc56a9 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -166,6 +166,8 @@ function ReadableState(options, stream, isDuplex) { // If true, a maybeReadMore has been scheduled. this.readingMore = false; + this.didRead = false; + this.decoder = null; this.encoding = null; if (options && options.encoding) { @@ -203,7 +205,9 @@ function Readable(options) { Stream.call(this, options); destroyImpl.construct(this, () => { - maybeReadMore(this, this._readableState); + if (this._readableState.didRead) { + maybeReadMore(this, this._readableState); + } }); } @@ -401,6 +405,8 @@ Readable.prototype.read = function(n) { const state = this._readableState; const nOrig = n; + state.didRead = true; + // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); @@ -1157,6 +1163,13 @@ ObjectDefineProperties(Readable.prototype, { } }, + readableDidRead: { + enumerable: false, + get: function() { + return this._readableState.didRead; + } + }, + readableHighWaterMark: { enumerable: false, get: function() { diff --git a/test/parallel/test-stream-readable-data.js b/test/parallel/test-stream-readable-data.js index 277adddde63584..b017b2a4a25395 100644 --- a/test/parallel/test-stream-readable-data.js +++ b/test/parallel/test-stream-readable-data.js @@ -1,11 +1,15 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable } = require('stream'); const readable = new Readable({ - read() {} + read() { + assert.strictEqual(readable.readableDidRead, true); + } }); +assert.strictEqual(readable.readableDidRead, false); function read() {}