diff --git a/src/node_http2.cc b/src/node_http2.cc index bc5584797f9faa..bf4dd8569d816f 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -675,7 +675,7 @@ Http2Session::Http2Session(Environment* env, ArrayBuffer::New(env->isolate(), js_fields_, kSessionUint8FieldCount); Local uint8_arr = Uint8Array::New(ab, 0, kSessionUint8FieldCount); - /*USE*/(wrap->Set(env->context(), env->fields_string(), uint8_arr)); + USE(wrap->Set(env->context(), env->fields_string(), uint8_arr)); } } @@ -702,6 +702,7 @@ Http2Session::~Http2Session() { DEBUG_HTTP2SESSION(this, "freeing nghttp2 session"); nghttp2_session_del(session_); CHECK_EQ(current_nghttp2_memory_, 0); + free(stream_buf_allocation_.base); } inline bool HasHttp2Observer(Environment* env) { @@ -1211,18 +1212,31 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, stream->statistics_.received_bytes += len; + Local ab; + if (session->stream_buf_ab_.IsEmpty()) { + ab = ArrayBuffer::New(env->isolate(), + session->stream_buf_allocation_.base, + session->stream_buf_allocation_.len, + v8::ArrayBufferCreationMode::kInternalized); + session->stream_buf_allocation_ = uv_buf_init(nullptr, 0); + session->stream_buf_ab_.Reset(env->isolate(), ab); + } else { + ab = session->stream_buf_ab_.Get(env->isolate()); + } + // There is a single large array buffer for the entire data read from the // network; create a slice of that array buffer and emit it as the // received data buffer. - CHECK(!session->stream_buf_ab_.IsEmpty()); - size_t offset = reinterpret_cast(data) - session->stream_buf_; + size_t offset = data - reinterpret_cast(session->stream_buf_.base); + // Verify that the data offset is inside the current read buffer. - CHECK_LE(offset, session->stream_buf_size_); + CHECK_LE(offset, session->stream_buf_.len); + CHECK_LE(offset + len, session->stream_buf_.len); - Local buf = - Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked(); + Local buffer = + Buffer::New(env, ab, offset, len).ToLocalChecked(); - stream->EmitData(len, buf, Local()); + stream->EmitData(len, buffer, Local()); if (!stream->IsReading()) stream->inbound_consumed_data_while_paused_ += len; else @@ -1841,24 +1855,27 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { Http2Session* session = static_cast(ctx); - CHECK_EQ(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_size_, 0); - buf->base = session->stream_buf_ = Malloc(suggested_size); - buf->len = session->stream_buf_size_ = suggested_size; - session->IncrementCurrentSessionMemory(suggested_size); + CHECK_EQ(session->stream_buf_.base, nullptr); + CHECK_EQ(session->stream_buf_.len, 0); + *buf = uv_buf_init(Malloc(suggested_size), suggested_size); } // Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamReadImpl(ssize_t nread, - const uv_buf_t* buf, + const uv_buf_t* buf_, uv_handle_type pending, void* ctx) { Http2Session* session = static_cast(ctx); Http2Scope h2scope(session); CHECK_NE(session->stream_, nullptr); DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); + CHECK_EQ(session->stream_buf_allocation_.base, nullptr); + CHECK(session->stream_buf_ab_.IsEmpty()); + + // Only pass data on if nread > 0 if (nread <= 0) { - free(session->stream_buf_); + if (buf_ != nullptr) + free(buf_->base); if (nread < 0) { uv_buf_t tmp_buf = uv_buf_init(nullptr, 0); session->prev_read_cb_.fn(nread, @@ -1866,58 +1883,54 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, pending, session->prev_read_cb_.ctx); } - } else { - // Only pass data on if nread > 0 - - // Verify that currently: There is memory allocated into which - // the data has been read, and that memory buffer is at least as large - // as the amount of data we have read, but we have not yet made an - // ArrayBuffer out of it. - CHECK_NE(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_, buf->base); - CHECK_EQ(session->stream_buf_size_, buf->len); - CHECK_GE(session->stream_buf_size_, static_cast(nread)); - CHECK(session->stream_buf_ab_.IsEmpty()); + return; + } - Environment* env = session->env(); - Isolate* isolate = env->isolate(); - HandleScope scope(isolate); - Local context = env->context(); - Context::Scope context_scope(context); + // Shrink to the actual amount of used data. + uv_buf_t buf = *buf_; + buf.base = Realloc(buf.base, nread); + + session->IncrementCurrentSessionMemory(nread); + OnScopeLeave on_scope_leave([&]() { + // Once finished handling this write, reset the stream buffer. + // The memory has either been free()d or was handed over to V8. + // We use `nread` instead of `buf.size()` here, because the buffer is + // cleared as part of the `.ToArrayBuffer()` call below. + session->DecrementCurrentSessionMemory(nread); + session->stream_buf_ab_.Reset(); + free(session->stream_buf_allocation_.base); + session->stream_buf_allocation_ = uv_buf_init(nullptr, 0); + session->stream_buf_ = uv_buf_init(nullptr, 0); + }); - // Create an array buffer for the read data. DATA frames will be emitted - // as slices of this array buffer to avoid having to copy memory. - session->stream_buf_ab_ = - ArrayBuffer::New(isolate, - session->stream_buf_, - session->stream_buf_size_, - v8::ArrayBufferCreationMode::kInternalized); - - uv_buf_t buf_ = uv_buf_init(buf->base, nread); - session->statistics_.data_received += nread; - ssize_t ret = session->Write(&buf_, 1); - - // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef - // ssize_t to int. Cast here so that the < 0 check actually works on - // Windows. - if (static_cast(ret) < 0) { - DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); - - Local argv[1] = { - Integer::New(isolate, ret), - }; - session->MakeCallback(env->error_string(), arraysize(argv), argv); - } else { - session->MaybeStopReading(); - } - } + // Make sure that there was no read previously active. + CHECK_EQ(session->stream_buf_.base, nullptr); + CHECK_EQ(session->stream_buf_.len, 0); + + // Remember the current buffer, so that OnDataChunkReceived knows the + // offset of a DATA frame's data into the socket read buffer. + session->stream_buf_ = uv_buf_init(buf.base, nread); - // Since we are finished handling this write, reset the stream buffer. - // The memory has either been free()d or was handed over to V8. - session->DecrementCurrentSessionMemory(session->stream_buf_size_); - session->stream_buf_ = nullptr; - session->stream_buf_size_ = 0; - session->stream_buf_ab_ = Local(); + // Verify that currently: There is memory allocated into which + // the data has been read, and that memory buffer is at least as large + // as the amount of data we have read, but we have not yet made an + // ArrayBuffer out of it. + CHECK_LE(static_cast(nread), session->stream_buf_.len); + + // Store this so we can create an ArrayBuffer for read data from it. + // DATA frames will be emitted as slices of that ArrayBuffer to avoid having + // to copy memory. + session->stream_buf_allocation_ = buf; + + session->statistics_.data_received += nread; + ssize_t ret = session->Write(&session->stream_buf_, 1); + + if (UNLIKELY(ret < 0)) { + DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); + Local arg = Integer::New(session->env()->isolate(), ret); + session->MakeCallback(session->env()->error_string(), 1, &arg); + return; + } } void Http2Session::OnStreamDestructImpl(void* ctx) { diff --git a/src/node_http2.h b/src/node_http2.h index 9e4a83d0a50b39..226cafdb2bfe35 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -834,10 +834,6 @@ class Http2Session : public AsyncWrap { size_t self_size() const override { return sizeof(*this); } - char* stream_alloc() { - return stream_buf_; - } - // Schedule an RstStream for after the current write finishes. inline void AddPendingRstStream(int32_t stream_id) { pending_rst_streams_.emplace_back(stream_id); @@ -1062,9 +1058,9 @@ class Http2Session : public AsyncWrap { // use this to allow timeout tracking during long-lasting writes uint32_t chunks_sent_since_last_write_ = 0; - char* stream_buf_ = nullptr; - size_t stream_buf_size_ = 0; - v8::Local stream_buf_ab_; + uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0); + v8::Global stream_buf_ab_; + uv_buf_t stream_buf_allocation_ = uv_buf_init(nullptr, 0); size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; diff --git a/src/util.h b/src/util.h index bf5e515083d447..c7f48e48f67893 100644 --- a/src/util.h +++ b/src/util.h @@ -33,6 +33,7 @@ #include #include +#include #include // std::remove_reference namespace node { @@ -433,6 +434,17 @@ class BufferValue : public MaybeStackBuffer { if (name##_length > 0) \ CHECK_NE(name##_data, nullptr); +// Use this when a variable or parameter is unused in order to explicitly +// silence a compiler warning about that. +template inline void USE(T&&) {} + +// Run a function when exiting the current scope. +struct OnScopeLeave { + std::function fn_; + + explicit OnScopeLeave(std::function fn) : fn_(fn) {} + ~OnScopeLeave() { fn_(); } +}; } // namespace node diff --git a/test/sequential/test-http2-max-session-memory.js b/test/sequential/test-http2-max-session-memory.js index 644a20a3c88a50..f770ee113945fc 100644 --- a/test/sequential/test-http2-max-session-memory.js +++ b/test/sequential/test-http2-max-session-memory.js @@ -8,7 +8,7 @@ const http2 = require('http2'); // Test that maxSessionMemory Caps work -const largeBuffer = Buffer.alloc(1e6); +const largeBuffer = Buffer.alloc(2e6); const server = http2.createServer({ maxSessionMemory: 1 });