Skip to content

Commit

Permalink
stream: add readableDidRead
Browse files Browse the repository at this point in the history
Adds readableDidRead to streams and applies usage to http, http2 and quic.
  • Loading branch information
ronag committed Jan 6, 2021
1 parent db79783 commit bbdf58e
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 31 deletions.
17 changes: 7 additions & 10 deletions lib/_http_incoming.js
Expand Up @@ -61,8 +61,6 @@ function IncomingMessage(socket) {

FunctionPrototypeCall(Readable, this, streamOptions);

this._readableState.readingMore = true;

this.socket = socket;

this.httpVersionMajor = null;
Expand All @@ -89,14 +87,19 @@ function IncomingMessage(socket) {
this.statusMessage = null;
this.client = socket;

this._consuming = false;
// Flag for when we decide that this message cannot possibly be
// read by the user, so there's no point continuing to handle it.
this._dumped = false;
}
ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype);
ObjectSetPrototypeOf(IncomingMessage, Readable);

ObjectDefineProperty(IncomingMessage.prototype, '_consuming', {
get() {
return this.readableDidRead;
}
});

ObjectDefineProperty(IncomingMessage.prototype, 'connection', {
get: function() {
return this.socket;
Expand Down Expand Up @@ -153,16 +156,10 @@ IncomingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {


IncomingMessage.prototype._read = function _read(n) {
if (!this._consuming) {
this._readableState.readingMore = false;
this._consuming = true;
}

// We actually do almost nothing here, because the parserOnBody
// function fills up our internal buffer directly. However, we
// do need to unpause the underlying socket so that it flows.
if (this.socket.readable)
readStart(this.socket);
readStart(this.socket);
};

// It's possible that the socket will be destroyed, and removed from
Expand Down
13 changes: 2 additions & 11 deletions lib/internal/http2/core.js
Expand Up @@ -546,7 +546,7 @@ function onStreamClose(code) {
// session) then just dump the incoming data so that the stream can
// be destroyed.
if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!stream[kState].didRead &&
!stream.readableDidRead &&
stream.readableFlowing === null)
stream.resume();
else
Expand Down Expand Up @@ -1889,14 +1889,9 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);

// Allow our logic for determining whether any reads have happened to
// work in all situations. This is similar to what we do in _http_incoming.
this._readableState.readingMore = true;

this[kTimeout] = null;

this[kState] = {
didRead: false,
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0,
Expand Down Expand Up @@ -2132,10 +2127,6 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
if (!this[kState].didRead) {
this._readableState.readingMore = false;
this[kState].didRead = true;
}
if (!this.pending) {
FunctionPrototypeCall(streamOnResume, this);
} else {
Expand Down Expand Up @@ -2298,7 +2289,7 @@ class Http2Stream extends Duplex {
this[kSession] &&
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this.readableDidRead &&
this.readableFlowing === null) {
// By using setImmediate we allow pushStreams to make it through
// before the stream is officially closed. This prevents a bug
Expand Down
8 changes: 0 additions & 8 deletions lib/internal/quic/core.js
Expand Up @@ -2522,7 +2522,6 @@ class QuicStream extends Duplex {
closePromiseReject: undefined,
closePromiseResolve: undefined,
defaultEncoding: undefined,
didRead: false,
id: undefined,
highWaterMark: undefined,
push_id: undefined,
Expand Down Expand Up @@ -2560,7 +2559,6 @@ class QuicStream extends Duplex {
state.defaultEncoding = defaultEncoding;
state.session = session;
state.push_id = push_id;
this._readableState.readingMore = true;
this.on('pause', streamOnPause);

if (handle !== undefined)
Expand Down Expand Up @@ -2846,12 +2844,6 @@ class QuicStream extends Duplex {
this.push(null);
return;
}
const state = this[kInternalState];
if (!state.didRead) {
this._readableState.readingMore = false;
state.didRead = true;
}

FunctionPrototypeCall(streamOnResume, this);
}

Expand Down
15 changes: 14 additions & 1 deletion lib/internal/streams/readable.js
Expand Up @@ -166,6 +166,8 @@ function ReadableState(options, stream, isDuplex) {
// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

this.didRead = false;

this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
Expand Down Expand Up @@ -203,7 +205,9 @@ function Readable(options) {
Stream.call(this, options);

destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
if (this._readableState.didRead) {
maybeReadMore(this, this._readableState);
}
});
}

Expand Down Expand Up @@ -401,6 +405,8 @@ Readable.prototype.read = function(n) {
const state = this._readableState;
const nOrig = n;

state.didRead = true;

// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
Expand Down Expand Up @@ -1157,6 +1163,13 @@ ObjectDefineProperties(Readable.prototype, {
}
},

readableDidRead: {
enumerable: false,
get: function() {
return this._readableState.didRead;
}
},

readableHighWaterMark: {
enumerable: false,
get: function() {
Expand Down
6 changes: 5 additions & 1 deletion test/parallel/test-stream-readable-data.js
@@ -1,11 +1,15 @@
'use strict';
const common = require('../common');
const assert = require('assert');

const { Readable } = require('stream');

const readable = new Readable({
read() {}
read() {
assert.strictEqual(readable.readableDidRead, true);
}
});
assert.strictEqual(readable.readableDidRead, false);

function read() {}

Expand Down

0 comments on commit bbdf58e

Please sign in to comment.