diff --git a/src/node_http2.cc b/src/node_http2.cc index e8bf89dd077355..6fa552a3156026 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -869,31 +869,52 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen, // various callback functions. Each of these will typically result in a call // out to JavaScript so this particular function is rather hot and can be // quite expensive. This is a potential performance optimization target later. -ssize_t Http2Session::Write(const uv_buf_t* bufs, size_t nbufs) { - size_t total = 0; - // Note that nghttp2_session_mem_recv is a synchronous operation that - // will trigger a number of other callbacks. Those will, in turn have +ssize_t Http2Session::ConsumeHTTP2Data() { + CHECK_NOT_NULL(stream_buf_.base); + CHECK_LT(stream_buf_offset_, stream_buf_.len); + size_t read_len = stream_buf_.len - stream_buf_offset_; + // multiple side effects. - for (size_t n = 0; n < nbufs; n++) { - Debug(this, "receiving %d bytes [wants data? %d]", - bufs[n].len, - nghttp2_session_want_read(session_)); - ssize_t ret = - nghttp2_session_mem_recv(session_, - reinterpret_cast(bufs[n].base), - bufs[n].len); - CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - - if (ret < 0) - return ret; + Debug(this, "receiving %d bytes [wants data? %d]", + read_len, + nghttp2_session_want_read(session_)); + flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED; + ssize_t ret = + nghttp2_session_mem_recv(session_, + reinterpret_cast(stream_buf_.base) + + stream_buf_offset_, + read_len); + CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - total += ret; + if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) { + CHECK_NE(flags_ & SESSION_STATE_READING_STOPPED, 0); + + CHECK_GT(ret, 0); + CHECK_LE(static_cast(ret), read_len); + + if (static_cast(ret) < read_len) { + // Mark the remainder of the data as available for later consumption. + stream_buf_offset_ += ret; + return ret; + } } + + // We are done processing the current input chunk. + DecrementCurrentSessionMemory(stream_buf_.len); + stream_buf_offset_ = 0; + stream_buf_ab_.Reset(); + free(stream_buf_allocation_.base); + stream_buf_allocation_ = uv_buf_init(nullptr, 0); + stream_buf_ = uv_buf_init(nullptr, 0); + + if (ret < 0) + return ret; + // Send any data that was queued up while processing the received data. if (!IsDestroyed()) { SendPendingData(); } - return total; + return ret; } @@ -1196,8 +1217,18 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle, nghttp2_session_consume_stream(handle, id, avail); else stream->inbound_consumed_data_while_paused_ += avail; + + // If we have a gathered a lot of data for output, try sending it now. + if (session->outgoing_length_ > 4096) session->SendPendingData(); } while (len != 0); + // If we are currently waiting for a write operation to finish, we should + // tell nghttp2 that we want to wait before we process more input data. + if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) { + session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED; + return NGHTTP2_ERR_PAUSE; + } + return 0; } @@ -1289,6 +1320,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { size_t offset = buf.base - session->stream_buf_.base; // Verify that the data offset is inside the current read buffer. + CHECK_GE(offset, session->stream_buf_offset_); CHECK_LE(offset, session->stream_buf_.len); CHECK_LE(offset + buf.len, session->stream_buf_.len); @@ -1586,6 +1618,11 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { stream_->ReadStart(); } + // If there is more incoming data queued up, consume it. + if (stream_buf_offset_ > 0) { + ConsumeHTTP2Data(); + } + if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) { // Schedule a new write if nghttp2 wants to send data. MaybeScheduleWrite(); @@ -1643,6 +1680,7 @@ void Http2Session::ClearOutgoing(int status) { if (outgoing_buffers_.size() > 0) { outgoing_storage_.clear(); + outgoing_length_ = 0; std::vector current_outgoing_buffers_; current_outgoing_buffers_.swap(outgoing_buffers_); @@ -1669,6 +1707,11 @@ void Http2Session::ClearOutgoing(int status) { } } +void Http2Session::PushOutgoingBuffer(nghttp2_stream_write&& write) { + outgoing_length_ += write.buf.len; + outgoing_buffers_.emplace_back(std::move(write)); +} + // Queue a given block of data for sending. This always creates a copy, // so it is used for the cases in which nghttp2 requests sending of a // small chunk of data. @@ -1681,7 +1724,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) { // of the outgoing_buffers_ vector may invalidate the pointer. // The correct base pointers will be set later, before writing to the // underlying socket. - outgoing_buffers_.emplace_back(nghttp2_stream_write { + PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(nullptr, src_length) }); } @@ -1804,13 +1847,13 @@ int Http2Session::OnSendData( if (write.buf.len <= length) { // This write does not suffice by itself, so we can consume it completely. length -= write.buf.len; - session->outgoing_buffers_.emplace_back(std::move(write)); + session->PushOutgoingBuffer(std::move(write)); stream->queue_.pop(); continue; } // Slice off `length` bytes of the first write in the queue. - session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + session->PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(write.buf.base, length) }); write.buf.base += length; @@ -1820,7 +1863,7 @@ int Http2Session::OnSendData( if (frame->data.padlen > 0) { // Send padding if that was requested. - session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + session->PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(const_cast(zero_bytes_256), frame->data.padlen - 1) }); } @@ -1853,8 +1896,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { Http2Scope h2scope(this); CHECK_NOT_NULL(stream_); Debug(this, "receiving %d bytes", nread); - CHECK_EQ(stream_buf_allocation_.base, nullptr); - CHECK(stream_buf_ab_.IsEmpty()); // Only pass data on if nread > 0 if (nread <= 0) { @@ -1865,26 +1906,33 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { return; } - // Shrink to the actual amount of used data. uv_buf_t buf = buf_; - buf.base = Realloc(buf.base, nread); - IncrementCurrentSessionMemory(nread); - OnScopeLeave on_scope_leave([&]() { - // Once finished handling this write, reset the stream buffer. - // The memory has either been free()d or was handed over to V8. - // We use `nread` instead of `buf.size()` here, because the buffer is - // cleared as part of the `.ToArrayBuffer()` call below. - DecrementCurrentSessionMemory(nread); + statistics_.data_received += nread; + + if (UNLIKELY(stream_buf_offset_ > 0)) { + // This is a very unlikely case, and should only happen if the ReadStart() + // call in OnStreamAfterWrite() immediately provides data. If that does + // happen, we concatenate the data we received with the already-stored + // pending input data, slicing off the already processed part. + char* new_buf = Malloc(stream_buf_.len - stream_buf_offset_ + nread); + memcpy(new_buf, + stream_buf_.base + stream_buf_offset_, + stream_buf_.len - stream_buf_offset_); + memcpy(new_buf + stream_buf_.len - stream_buf_offset_, + buf.base, + nread); + free(buf.base); + nread = stream_buf_.len - stream_buf_offset_ + nread; + buf = uv_buf_init(new_buf, nread); + stream_buf_offset_ = 0; stream_buf_ab_.Reset(); - free(stream_buf_allocation_.base); - stream_buf_allocation_ = uv_buf_init(nullptr, 0); - stream_buf_ = uv_buf_init(nullptr, 0); - }); + DecrementCurrentSessionMemory(stream_buf_offset_); + } - // Make sure that there was no read previously active. - CHECK_NULL(stream_buf_.base); - CHECK_EQ(stream_buf_.len, 0); + // Shrink to the actual amount of used data. + buf.base = Realloc(buf.base, nread); + IncrementCurrentSessionMemory(nread); // Remember the current buffer, so that OnDataChunkReceived knows the // offset of a DATA frame's data into the socket read buffer. @@ -1903,8 +1951,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { // to copy memory. stream_buf_allocation_ = buf; - statistics_.data_received += nread; - ssize_t ret = Write(&stream_buf_, 1); + ssize_t ret = ConsumeHTTP2Data(); if (UNLIKELY(ret < 0)) { Debug(this, "fatal error receiving data: %d", ret); diff --git a/src/node_http2.h b/src/node_http2.h index 8b393458b05198..1526e0b47e5660 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -336,6 +336,7 @@ enum session_state_flags { SESSION_STATE_SENDING = 0x10, SESSION_STATE_WRITE_IN_PROGRESS = 0x20, SESSION_STATE_READING_STOPPED = 0x40, + SESSION_STATE_NGHTTP2_RECV_PAUSED = 0x80 }; typedef uint32_t(*get_setting)(nghttp2_session* session, @@ -775,14 +776,15 @@ class Http2Session : public AsyncWrap, public StreamListener { // Indicates whether there currently exist outgoing buffers for this stream. bool HasWritesOnSocketForStream(Http2Stream* stream); - // Write data to the session - ssize_t Write(const uv_buf_t* bufs, size_t nbufs); + // Write data from stream_buf_ to the session + ssize_t ConsumeHTTP2Data(); void MemoryInfo(MemoryTracker* tracker) const override { tracker->TrackField("streams", streams_); tracker->TrackField("outstanding_pings", outstanding_pings_); tracker->TrackField("outstanding_settings", outstanding_settings_); tracker->TrackField("outgoing_buffers", outgoing_buffers_); + tracker->TrackFieldWithSize("stream_buf", stream_buf_.len); tracker->TrackFieldWithSize("outgoing_storage", outgoing_storage_.size()); tracker->TrackFieldWithSize("pending_rst_streams", pending_rst_streams_.size() * sizeof(int32_t)); @@ -845,6 +847,9 @@ class Http2Session : public AsyncWrap, public StreamListener { } void DecrementCurrentSessionMemory(uint64_t amount) { +#ifdef DEBUG + CHECK_LE(amount, current_session_memory_); +#endif current_session_memory_ -= amount; } @@ -1007,8 +1012,11 @@ class Http2Session : public AsyncWrap, public StreamListener { uint32_t chunks_sent_since_last_write_ = 0; uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0); + // When processing input data, either stream_buf_ab_ or stream_buf_allocation_ + // will be set. stream_buf_ab_ is lazily created from stream_buf_allocation_. v8::Global stream_buf_ab_; uv_buf_t stream_buf_allocation_ = uv_buf_init(nullptr, 0); + size_t stream_buf_offset_ = 0; size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; @@ -1018,6 +1026,7 @@ class Http2Session : public AsyncWrap, public StreamListener { std::vector outgoing_buffers_; std::vector outgoing_storage_; + size_t outgoing_length_ = 0; std::vector pending_rst_streams_; // Count streams that have been rejected while being opened. Exceeding a fixed // limit will result in the session being destroyed, as an indication of a @@ -1027,6 +1036,7 @@ class Http2Session : public AsyncWrap, public StreamListener { // Also use the invalid frame count as a measure for rejecting input frames. int32_t invalid_frame_count_ = 0; + void PushOutgoingBuffer(nghttp2_stream_write&& write); void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length); void ClearOutgoing(int status); diff --git a/test/parallel/test-http2-large-write-multiple-requests.js b/test/parallel/test-http2-large-write-multiple-requests.js new file mode 100644 index 00000000000000..0d65c3479b409d --- /dev/null +++ b/test/parallel/test-http2-large-write-multiple-requests.js @@ -0,0 +1,39 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const http2 = require('http2'); + +const content = fixtures.readSync('person-large.jpg'); + +const server = http2.createServer({ + maxSessionMemory: 1000 +}); +server.on('stream', (stream, headers) => { + stream.respond({ + 'content-type': 'image/jpeg', + ':status': 200 + }); + stream.end(content); +}); +server.unref(); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`http://localhost:${server.address().port}/`); + + let finished = 0; + for (let i = 0; i < 100; i++) { + const req = client.request({ ':path': '/' }).end(); + const chunks = []; + req.on('data', (chunk) => { + chunks.push(chunk); + }); + req.on('end', common.mustCall(() => { + assert.deepStrictEqual(Buffer.concat(chunks), content); + if (++finished === 100) client.close(); + })); + } +}));