Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: rework socketaddress info tracking and other cleanups #34618

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions doc/api/quic.md
Expand Up @@ -288,11 +288,6 @@ added: REPLACEME
* `validateAddress` {boolean} When `true`, the `QuicSocket` will use explicit
address validation using a QUIC `RETRY` frame when listening for new server
sessions. Default: `false`.
* `validateAddressLRU` {boolean} When `true`, validation will be skipped if
the address has been recently validated. Currently, only the 10 most
recently validated addresses are remembered. Setting `validateAddressLRU`
to `true`, will enable the `validateAddress` option as well. Default:
`false`.

The `net.createQuicSocket()` function is used to create new `QuicSocket`
instances associated with a local UDP address.
Expand Down
50 changes: 17 additions & 33 deletions lib/internal/quic/core.js
Expand Up @@ -179,7 +179,6 @@ const {
QUICCLIENTSESSION_OPTION_REQUEST_OCSP,
QUICCLIENTSESSION_OPTION_VERIFY_HOSTNAME_IDENTITY,
QUICSOCKET_OPTIONS_VALIDATE_ADDRESS,
QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU,
QUICSTREAM_HEADERS_KIND_NONE,
QUICSTREAM_HEADERS_KIND_INFORMATIONAL,
QUICSTREAM_HEADERS_KIND_INITIAL,
Expand Down Expand Up @@ -225,6 +224,7 @@ const kOnFileOpened = Symbol('kOnFileOpened');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
const kReady = Symbol('kReady');
const kRemoveFromSocket = Symbol('kRemoveFromSocket');
const kRemoveSession = Symbol('kRemove');
const kRemoveStream = Symbol('kRemoveStream');
const kServerBusy = Symbol('kServerBusy');
Expand Down Expand Up @@ -940,9 +940,6 @@ class QuicSocket extends EventEmitter {
// True if address verification should be used.
validateAddress,

// True if an LRU should be used for add validation
validateAddressLRU,

// Whether qlog should be enabled for sessions
qlog,

Expand All @@ -963,8 +960,6 @@ class QuicSocket extends EventEmitter {
let socketOptions = 0;
if (validateAddress)
socketOptions |= (1 << QUICSOCKET_OPTIONS_VALIDATE_ADDRESS);
if (validateAddressLRU)
socketOptions |= (1 << QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU);

this[kSetHandle](
new QuicSocketHandle(
Expand Down Expand Up @@ -2247,9 +2242,7 @@ class QuicSession extends EventEmitter {
return this[kInternalState].handshakeContinuationHistogram;
}

// TODO(addaleax): This is a temporary solution for testing and should be
// removed later.
removeFromSocket() {
[kRemoveFromSocket]() {
return this[kHandle].removeFromSocket();
}
}
Expand Down Expand Up @@ -2695,7 +2688,17 @@ class QuicStream extends Duplex {
}

[kAfterAsyncWrite]({ bytes }) {
// TODO(@jasnell): Implement this
// There's currently nothing we need to do here. We have
// to have this but it's a non-op
}

[kUpdateTimer]() {
// Timeout is implemented in the QuicSession at the native
// layer. We have to have this here but it's a non-op
}

[kTrackWriteState](stream, bytes) {
// There's currently nothing to do here.
}

[kInspect](depth, options) {
Expand All @@ -2712,17 +2715,6 @@ class QuicStream extends Duplex {
}, depth, options);
}

[kTrackWriteState](stream, bytes) {
// TODO(@jasnell): Not yet sure what we want to do with these
// this.#writeQueueSize += bytes;
// this.#writeQueueSize += bytes;
// this[kHandle].chunksSentSinceLastWrite = 0;
}

[kUpdateTimer]() {
// TODO(@jasnell): Implement this later
}

get detached() {
// The QuicStream is detached if it is yet destroyed
// but the underlying handle is undefined. While in
Expand Down Expand Up @@ -2750,7 +2742,7 @@ class QuicStream extends Duplex {

[kWriteGeneric](writev, data, encoding, cb) {
if (this.destroyed || this.detached)
return; // TODO(addaleax): Can this happen?
return;

this[kUpdateTimer]();
const req = (writev) ?
Expand Down Expand Up @@ -2810,7 +2802,7 @@ class QuicStream extends Duplex {
}

_read(nread) {
if (this.destroyed) { // TODO(addaleax): Can this happen?
if (this.destroyed) {
this.push(null);
return;
}
Expand Down Expand Up @@ -2910,11 +2902,6 @@ class QuicStream extends Duplex {
undefined;
}

get bufferSize() {
// TODO(@jasnell): Implement this
return undefined;
}

get id() {
return this[kInternalState].id;
}
Expand All @@ -2923,10 +2910,6 @@ class QuicStream extends Duplex {
return this[kInternalState].push_id;
}

_onTimeout() {
// TODO(@jasnell): Implement this
}

get session() {
return this[kInternalState].session;
}
Expand Down Expand Up @@ -3127,7 +3110,8 @@ function createSocket(options) {

module.exports = {
createSocket,
kUDPHandleForTesting
kUDPHandleForTesting,
kRemoveFromSocket,
};

/* eslint-enable no-use-before-define */
Expand Down
5 changes: 1 addition & 4 deletions lib/internal/quic/util.js
Expand Up @@ -539,7 +539,6 @@ function validateQuicSocketOptions(options = {}) {
retryTokenTimeout = DEFAULT_RETRYTOKEN_EXPIRATION,
server = {},
statelessResetSecret,
validateAddressLRU = false,
validateAddress = false,
} = options;

Expand All @@ -548,7 +547,6 @@ function validateQuicSocketOptions(options = {}) {
validateObject(server, 'options.server');
validateLookup(lookup);
validateBoolean(validateAddress, 'options.validateAddress');
validateBoolean(validateAddressLRU, 'options.validateAddressLRU');
validateBoolean(qlog, 'options.qlog');
validateBoolean(disableStatelessReset, 'options.disableStatelessReset');

Expand Down Expand Up @@ -597,8 +595,7 @@ function validateQuicSocketOptions(options = {}) {
retryTokenTimeout,
server,
type,
validateAddress: validateAddress || validateAddressLRU,
validateAddressLRU,
validateAddress,
qlog,
statelessResetSecret,
disableStatelessReset,
Expand Down
67 changes: 67 additions & 0 deletions src/node_sockaddr-inl.h
Expand Up @@ -7,6 +7,7 @@
#include "node_internals.h"
#include "node_sockaddr.h"
#include "util-inl.h"
#include "memory_tracker-inl.h"

#include <string>

Expand Down Expand Up @@ -164,6 +165,72 @@ bool SocketAddress::operator==(const SocketAddress& other) const {
bool SocketAddress::operator!=(const SocketAddress& other) const {
return !(*this == other);
}

template <typename T>
SocketAddressLRU<T>::SocketAddressLRU(
size_t max_size)
: max_size_(max_size) {}

template <typename T>
typename T::Type* SocketAddressLRU<T>::Peek(
const SocketAddress& address) const {
auto it = map_.find(address);
return it == std::end(map_) ? nullptr : &it->second->second;
}

template <typename T>
void SocketAddressLRU<T>::CheckExpired() {
auto it = list_.rbegin();
while (it != list_.rend()) {
if (T::CheckExpired(it->first, it->second)) {
map_.erase(it->first);
list_.pop_back();
it = list_.rbegin();
continue;
} else {
break;
}
}
}

template <typename T>
void SocketAddressLRU<T>::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("list", size() * sizeof(Pair));
}

// If an item already exists for the given address, bump up it's
// position in the LRU list and return it. If the item does not
// exist, create it. If an item is created, check the size of the
// cache and adjust if necessary. Whether the item exists or not,
// purge expired items.
template <typename T>
typename T::Type* SocketAddressLRU<T>::Upsert(
const SocketAddress& address) {

auto on_exit = OnScopeLeave([&]() { CheckExpired(); });

auto it = map_.find(address);
if (it != std::end(map_)) {
list_.splice(list_.begin(), list_, it->second);
T::Touch(it->first, &it->second->second);
return &it->second->second;
}

list_.push_front(Pair(address, { }));
map_[address] = list_.begin();
T::Touch(list_.begin()->first, &list_.begin()->second);

// Drop the last item in the list if we are
// over the size limit...
if (map_.size() > max_size_) {
auto last = list_.end();
map_.erase((--last)->first);
list_.pop_back();
}

return &map_[address]->second;
}

} // namespace node

#endif // NODE_WANT_INTERNALS
Expand Down
36 changes: 36 additions & 0 deletions src/node_sockaddr.h
Expand Up @@ -9,6 +9,7 @@
#include "v8.h"

#include <string>
#include <list>
#include <unordered_map>

namespace node {
Expand Down Expand Up @@ -116,6 +117,41 @@ class SocketAddress : public MemoryRetainer {
sockaddr_storage address_;
};

template <typename T>
class SocketAddressLRU : public MemoryRetainer {
public:
using Type = typename T::Type;

inline explicit SocketAddressLRU(size_t max_size);

// If the item already exists, returns a reference to
// the existing item, adjusting items position in the
// LRU. If the item does not exist, emplaces the item
// and returns the new item.
Type* Upsert(const SocketAddress& address);

// Returns a reference to the item if it exists, or
// nullptr. The position in the LRU is not modified.
Type* Peek(const SocketAddress& address) const;

size_t size() const { return map_.size(); }
size_t max_size() const { return max_size_; }

void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(SocketAddressLRU)
SET_SELF_SIZE(SocketAddressLRU)

private:
using Pair = std::pair<SocketAddress, Type>;
using Iterator = typename std::list<Pair>::iterator;

void CheckExpired();

std::list<Pair> list_;
SocketAddress::Map<Iterator> map_;
size_t max_size_;
};

} // namespace node

#endif // NOE_WANT_INTERNALS
Expand Down
1 change: 0 additions & 1 deletion src/quic/node_quic.cc
Expand Up @@ -189,7 +189,6 @@ void Initialize(Local<Object> target,
V(QUICSERVERSESSION_OPTION_REJECT_UNAUTHORIZED) \
V(QUICSERVERSESSION_OPTION_REQUEST_CERT) \
V(QUICSOCKET_OPTIONS_VALIDATE_ADDRESS) \
V(QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU) \
V(QUICSTREAM_HEADER_FLAGS_NONE) \
V(QUICSTREAM_HEADER_FLAGS_TERMINAL) \
V(QUICSTREAM_HEADERS_KIND_NONE) \
Expand Down
42 changes: 13 additions & 29 deletions src/quic/node_quic_socket-inl.h
Expand Up @@ -4,7 +4,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node_quic_socket.h"
#include "node_sockaddr.h"
#include "node_sockaddr-inl.h"
#include "node_quic_session.h"
#include "node_crypto.h"
#include "debug_utils-inl.h"
Expand Down Expand Up @@ -112,33 +112,27 @@ void QuicSocket::ReportSendError(int error) {
}

void QuicSocket::IncrementStatelessResetCounter(const SocketAddress& addr) {
reset_counts_[addr]++;
addrLRU_.Upsert(addr)->reset_count++;
}

void QuicSocket::IncrementSocketAddressCounter(const SocketAddress& addr) {
addr_counts_[addr]++;
addrLRU_.Upsert(addr)->active_connections++;
}

void QuicSocket::DecrementSocketAddressCounter(const SocketAddress& addr) {
auto it = addr_counts_.find(addr);
if (it == std::end(addr_counts_))
return;
it->second--;
// Remove the address if the counter reaches zero again.
if (it->second == 0) {
addr_counts_.erase(addr);
reset_counts_.erase(addr);
}
SocketAddressInfo* counts = addrLRU_.Peek(addr);
if (counts != nullptr && counts->active_connections > 0)
counts->active_connections--;
}

size_t QuicSocket::GetCurrentSocketAddressCounter(const SocketAddress& addr) {
auto it = addr_counts_.find(addr);
return it == std::end(addr_counts_) ? 0 : it->second;
SocketAddressInfo* counts = addrLRU_.Peek(addr);
return counts != nullptr ? counts->active_connections : 0;
}

size_t QuicSocket::GetCurrentStatelessResetCounter(const SocketAddress& addr) {
auto it = reset_counts_.find(addr);
return it == std::end(reset_counts_) ? 0 : it->second;
SocketAddressInfo* counts = addrLRU_.Peek(addr);
return counts != nullptr ? counts->reset_count : 0;
}

void QuicSocket::ServerBusy(bool on) {
Expand All @@ -160,22 +154,12 @@ void QuicSocket::set_diagnostic_packet_loss(double rx, double tx) {
}

void QuicSocket::set_validated_address(const SocketAddress& addr) {
if (has_option_validate_address_lru()) {
// Remove the oldest item if we've hit the LRU limit
validated_addrs_.push_back(SocketAddress::Hash()(addr));
if (validated_addrs_.size() > kMaxValidateAddressLru)
validated_addrs_.pop_front();
}
addrLRU_.Upsert(addr)->validated = true;
}

bool QuicSocket::is_validated_address(const SocketAddress& addr) const {
if (has_option_validate_address_lru()) {
auto res = std::find(std::begin(validated_addrs_),
std::end(validated_addrs_),
SocketAddress::Hash()(addr));
return res != std::end(validated_addrs_);
}
return false;
auto info = addrLRU_.Peek(addr);
return info != nullptr ? info->validated : false;
}

void QuicSocket::AddSession(
Expand Down