diff --git a/src/node_http2.cc b/src/node_http2.cc index eae52c66e10590..5cd73ca4c585e1 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -918,31 +918,51 @@ inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen, // various callback functions. Each of these will typically result in a call // out to JavaScript so this particular function is rather hot and can be // quite expensive. This is a potential performance optimization target later. -inline ssize_t Http2Session::Write(const uv_buf_t* bufs, size_t nbufs) { - size_t total = 0; - // Note that nghttp2_session_mem_recv is a synchronous operation that - // will trigger a number of other callbacks. Those will, in turn have - // multiple side effects. - for (size_t n = 0; n < nbufs; n++) { - DEBUG_HTTP2SESSION2(this, "receiving %d bytes [wants data? %d]", - bufs[n].len, - nghttp2_session_want_read(session_)); - ssize_t ret = - nghttp2_session_mem_recv(session_, - reinterpret_cast(bufs[n].base), - bufs[n].len); - CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - - if (ret < 0) - return ret; +ssize_t Http2Session::ConsumeHTTP2Data() { + CHECK_NE(stream_buf_.base, nullptr); + CHECK_LT(stream_buf_offset_, stream_buf_.len); + size_t read_len = stream_buf_.len - stream_buf_offset_; + + DEBUG_HTTP2SESSION2(this, "receiving %d bytes [wants data? %d]", + read_len, + nghttp2_session_want_read(session_)); + flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED; + ssize_t ret = + nghttp2_session_mem_recv(session_, + reinterpret_cast(stream_buf_.base) + + stream_buf_offset_, + read_len); + CHECK_NE(ret, NGHTTP2_ERR_NOMEM); + + if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) { + CHECK_NE(flags_ & SESSION_STATE_READING_STOPPED, 0); + + CHECK_GT(ret, 0); + CHECK_LE(static_cast(ret), read_len); - total += ret; + if (static_cast(ret) < read_len) { + // Mark the remainder of the data as available for later consumption. + stream_buf_offset_ += ret; + return ret; + } } + + // We are done processing the current input chunk. + DecrementCurrentSessionMemory(stream_buf_.len); + stream_buf_offset_ = 0; + stream_buf_ab_.Reset(); + free(stream_buf_allocation_.base); + stream_buf_allocation_ = uv_buf_init(nullptr, 0); + stream_buf_ = uv_buf_init(nullptr, 0); + + if (ret < 0) + return ret; + // Send any data that was queued up while processing the received data. if (!IsDestroyed()) { SendPendingData(); } - return total; + return ret; } @@ -1238,6 +1258,7 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, size_t offset = data - reinterpret_cast(session->stream_buf_.base); // Verify that the data offset is inside the current read buffer. + CHECK_GE(offset, session->stream_buf_offset_); CHECK_LE(offset, session->stream_buf_.len); CHECK_LE(offset + len, session->stream_buf_.len); @@ -1250,6 +1271,16 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, else nghttp2_session_consume_stream(handle, id, len); + // If we have a gathered a lot of data for output, try sending it now. + if (session->outgoing_length_ > 4096) session->SendPendingData(); + + // If we are currently waiting for a write operation to finish, we should + // tell nghttp2 that we want to wait before we process more input data. + if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) { + session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED; + return NGHTTP2_ERR_PAUSE; + } + return 0; } @@ -1597,6 +1628,11 @@ void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) { session->stream_->ReadStart(); } + // If there is more incoming data queued up, consume it. + if (session->stream_buf_offset_ > 0) { + session->ConsumeHTTP2Data(); + } + if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { // Schedule a new write if nghttp2 wants to send data. session->MaybeScheduleWrite(); @@ -1654,6 +1690,7 @@ void Http2Session::ClearOutgoing(int status) { if (outgoing_buffers_.size() > 0) { outgoing_storage_.clear(); + outgoing_length_ = 0; std::vector current_outgoing_buffers_; current_outgoing_buffers_.swap(outgoing_buffers_); @@ -1680,6 +1717,11 @@ void Http2Session::ClearOutgoing(int status) { } } +void Http2Session::PushOutgoingBuffer(nghttp2_stream_write&& write) { + outgoing_length_ += write.buf.len; + outgoing_buffers_.emplace_back(std::move(write)); +} + // Queue a given block of data for sending. This always creates a copy, // so it is used for the cases in which nghttp2 requests sending of a // small chunk of data. @@ -1692,7 +1734,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) { // of the outgoing_buffers_ vector may invalidate the pointer. // The correct base pointers will be set later, before writing to the // underlying socket. - outgoing_buffers_.emplace_back(nghttp2_stream_write { + PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(nullptr, src_length) }); } @@ -1826,13 +1868,13 @@ int Http2Session::OnSendData( if (write.buf.len <= length) { // This write does not suffice by itself, so we can consume it completely. length -= write.buf.len; - session->outgoing_buffers_.emplace_back(std::move(write)); + session->PushOutgoingBuffer(std::move(write)); stream->queue_.pop(); continue; } // Slice off `length` bytes of the first write in the queue. - session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + session->PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(write.buf.base, length) }); write.buf.base += length; @@ -1842,7 +1884,7 @@ int Http2Session::OnSendData( if (frame->data.padlen > 0) { // Send padding if that was requested. - session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + session->PushOutgoingBuffer(nghttp2_stream_write { uv_buf_init(const_cast(zero_bytes_256), frame->data.padlen - 1) }); } @@ -1896,8 +1938,6 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, Http2Scope h2scope(session); CHECK_NE(session->stream_, nullptr); DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); - CHECK_EQ(session->stream_buf_allocation_.base, nullptr); - CHECK(session->stream_buf_ab_.IsEmpty()); // Only pass data on if nread > 0 if (nread <= 0) { @@ -1913,26 +1953,34 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, return; } - // Shrink to the actual amount of used data. uv_buf_t buf = *buf_; - buf.base = Realloc(buf.base, nread); - session->IncrementCurrentSessionMemory(nread); - OnScopeLeave on_scope_leave([&]() { - // Once finished handling this write, reset the stream buffer. - // The memory has either been free()d or was handed over to V8. - // We use `nread` instead of `buf.size()` here, because the buffer is - // cleared as part of the `.ToArrayBuffer()` call below. - session->DecrementCurrentSessionMemory(nread); + session->statistics_.data_received += nread; + + if (UNLIKELY(session->stream_buf_offset_ > 0)) { + // This is a very unlikely case, and should only happen if the ReadStart() + // call in OnStreamAfterWrite() immediately provides data. If that does + // happen, we concatenate the data we received with the already-stored + // pending input data, slicing off the already processed part. + char* new_buf = Malloc( + session->stream_buf_.len - session->stream_buf_offset_ + nread); + memcpy(new_buf, + session->stream_buf_.base + session->stream_buf_offset_, + session->stream_buf_.len - session->stream_buf_offset_); + memcpy(new_buf + session->stream_buf_.len - session->stream_buf_offset_, + buf.base, + nread); + free(buf.base); + nread = session->stream_buf_.len - session->stream_buf_offset_ + nread; + buf = uv_buf_init(new_buf, nread); + session->stream_buf_offset_ = 0; session->stream_buf_ab_.Reset(); - free(session->stream_buf_allocation_.base); - session->stream_buf_allocation_ = uv_buf_init(nullptr, 0); - session->stream_buf_ = uv_buf_init(nullptr, 0); - }); + session->DecrementCurrentSessionMemory(session->stream_buf_offset_); + } - // Make sure that there was no read previously active. - CHECK_EQ(session->stream_buf_.base, nullptr); - CHECK_EQ(session->stream_buf_.len, 0); + // Shrink to the actual amount of used data. + buf.base = Realloc(buf.base, nread); + session->IncrementCurrentSessionMemory(nread); // Remember the current buffer, so that OnDataChunkReceived knows the // offset of a DATA frame's data into the socket read buffer. @@ -1949,8 +1997,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, // to copy memory. session->stream_buf_allocation_ = buf; - session->statistics_.data_received += nread; - ssize_t ret = session->Write(&session->stream_buf_, 1); + ssize_t ret = session->ConsumeHTTP2Data(); if (UNLIKELY(ret < 0)) { DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); diff --git a/src/node_http2.h b/src/node_http2.h index bb49bedce3544a..a1a7beba17cc96 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -386,6 +386,7 @@ enum session_state_flags { SESSION_STATE_SENDING = 0x10, SESSION_STATE_WRITE_IN_PROGRESS = 0x20, SESSION_STATE_READING_STOPPED = 0x40, + SESSION_STATE_NGHTTP2_RECV_PAUSED = 0x80 }; // This allows for 4 default-sized frames with their frame headers @@ -831,8 +832,8 @@ class Http2Session : public AsyncWrap { // Indicates whether there currently exist outgoing buffers for this stream. bool HasWritesOnSocketForStream(Http2Stream* stream); - // Write data to the session - inline ssize_t Write(const uv_buf_t* bufs, size_t nbufs); + // Write data from stream_buf_ to the session + ssize_t ConsumeHTTP2Data(); size_t self_size() const override { return sizeof(*this); } @@ -896,6 +897,9 @@ class Http2Session : public AsyncWrap { } void DecrementCurrentSessionMemory(uint64_t amount) { +#ifdef DEBUG + CHECK_LE(amount, current_session_memory_); +#endif current_session_memory_ -= amount; } @@ -1061,8 +1065,11 @@ class Http2Session : public AsyncWrap { uint32_t chunks_sent_since_last_write_ = 0; uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0); + // When processing input data, either stream_buf_ab_ or stream_buf_allocation_ + // will be set. stream_buf_ab_ is lazily created from stream_buf_allocation_. v8::Global stream_buf_ab_; uv_buf_t stream_buf_allocation_ = uv_buf_init(nullptr, 0); + size_t stream_buf_offset_ = 0; size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; @@ -1072,6 +1079,7 @@ class Http2Session : public AsyncWrap { std::vector outgoing_buffers_; std::vector outgoing_storage_; + size_t outgoing_length_ = 0; std::vector pending_rst_streams_; // Count streams that have been rejected while being opened. Exceeding a fixed // limit will result in the session being destroyed, as an indication of a @@ -1081,6 +1089,7 @@ class Http2Session : public AsyncWrap { // Also use the invalid frame count as a measure for rejecting input frames. int32_t invalid_frame_count_ = 0; + void PushOutgoingBuffer(nghttp2_stream_write&& write); void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length); void ClearOutgoing(int status); diff --git a/test/parallel/test-http2-large-write-multiple-requests.js b/test/parallel/test-http2-large-write-multiple-requests.js new file mode 100644 index 00000000000000..4ca2774ca0a1a1 --- /dev/null +++ b/test/parallel/test-http2-large-write-multiple-requests.js @@ -0,0 +1,40 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const http2 = require('http2'); + +const content = fixtures.readSync('person-large.jpg'); + +const server = http2.createServer({ + maxSessionMemory: 1000 +}); +server.on('stream', (stream, headers) => { + stream.respond({ + 'content-type': 'image/jpeg', + ':status': 200 + }); + stream.end(content); +}); +server.unref(); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`http://localhost:${server.address().port}/`); + + let finished = 0; + for (let i = 0; i < 100; i++) { + const req = client.request({ ':path': '/' }); + req.end(); + const chunks = []; + req.on('data', (chunk) => { + chunks.push(chunk); + }); + req.on('end', common.mustCall(() => { + assert.deepStrictEqual(Buffer.concat(chunks), content); + if (++finished === 100) client.close(); + })); + } +}));