Skip to content

Commit

Permalink
Merge pull request #83 from planetary-social/rate-limit-metrics
Browse files Browse the repository at this point in the history
Add metrics for rate limits
  • Loading branch information
dcadenas committed Mar 5, 2024
2 parents 841cb77 + 813ffcc commit ff78308
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
3 changes: 3 additions & 0 deletions cmd/send-all-events-to-relay/main.go
Expand Up @@ -334,6 +334,9 @@ func (m mockMetrics) ReportRelayConnectionsState(v map[domain.RelayAddress]relay
func (m mockMetrics) ReportNumberOfSubscriptions(address domain.RelayAddress, n int) {
}

func (m mockMetrics) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) {
}

func (m mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) {
}

Expand Down
16 changes: 16 additions & 0 deletions service/adapters/prometheus/prometheus.go
Expand Up @@ -46,6 +46,7 @@ type Prometheus struct {
relayConnectionStateGauge *prometheus.GaugeVec
receivedEventsCounter *prometheus.CounterVec
relayConnectionSubscriptionsGauge *prometheus.GaugeVec
relayRateLimitBackoffMsGauge *prometheus.GaugeVec
relayConnectionReceivedMessagesCounter *prometheus.CounterVec
relayConnectionDisconnectionsCounter *prometheus.CounterVec
storedRelayAddressesGauge prometheus.Gauge
Expand Down Expand Up @@ -127,6 +128,13 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelAddress},
)
relayRateLimitBackoffMsGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "relay_rate_limit_backoff_ms_gauge",
Help: "Rate limit wait in milliseconds.",
},
[]string{labelAddress},
)
relayConnectionReceivedMessagesCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "relay_connection_received_messages_counter",
Expand Down Expand Up @@ -173,6 +181,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
relayConnectionStateGauge,
receivedEventsCounter,
relayConnectionSubscriptionsGauge,
relayRateLimitBackoffMsGauge,
relayConnectionReceivedMessagesCounter,
relayConnectionDisconnectionsCounter,
storedRelayAddressesGauge,
Expand Down Expand Up @@ -210,6 +219,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
relayConnectionStateGauge: relayConnectionStateGauge,
receivedEventsCounter: receivedEventsCounter,
relayConnectionSubscriptionsGauge: relayConnectionSubscriptionsGauge,
relayRateLimitBackoffMsGauge: relayRateLimitBackoffMsGauge,
relayConnectionReceivedMessagesCounter: relayConnectionReceivedMessagesCounter,
relayConnectionDisconnectionsCounter: relayConnectionDisconnectionsCounter,
storedRelayAddressesGauge: storedRelayAddressesGauge,
Expand Down Expand Up @@ -260,6 +270,12 @@ func (p *Prometheus) ReportNumberOfSubscriptions(address domain.RelayAddress, n
}).Set(float64(n))
}

func (p *Prometheus) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) {
p.relayRateLimitBackoffMsGauge.With(prometheus.Labels{
labelAddress: address.String(),
}).Set(float64(n))
}

func (p *Prometheus) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) {
labels := prometheus.Labels{
labelAddress: address.String(),
Expand Down
14 changes: 10 additions & 4 deletions service/domain/relays/rate_limit_notice_backoff_manager.go
Expand Up @@ -40,8 +40,16 @@ const maxBackoffMs = 10000
const secondsToDecreaseRateLimitNoticeCount = 60 * 5 // 5 minutes = 300 seconds

func (r *RateLimitNoticeBackoffManager) Wait() {
backoffMs := r.GetBackoffMs()

if backoffMs > 0 {
time.Sleep(time.Duration(backoffMs) * time.Millisecond)
}
}

func (r *RateLimitNoticeBackoffManager) GetBackoffMs() int {
if !r.IsSet() {
return
return 0
}

backoffMs := int(math.Min(float64(maxBackoffMs), math.Pow(2, float64(r.rateLimitNoticeCount))*50))
Expand All @@ -52,9 +60,7 @@ func (r *RateLimitNoticeBackoffManager) Wait() {
r.updateLastBumpTime()
}

if backoffMs > 0 {
time.Sleep(time.Duration(backoffMs) * time.Millisecond)
}
return backoffMs
}

func (r *RateLimitNoticeBackoffManager) updateLastBumpTime() time.Time {
Expand Down
3 changes: 3 additions & 0 deletions service/domain/relays/relay_connection_test.go
Expand Up @@ -253,6 +253,9 @@ func (m2 mockMetrics) ReportRelayConnectionsState(m map[domain.RelayAddress]rela
func (m2 mockMetrics) ReportNumberOfSubscriptions(address domain.RelayAddress, n int) {
}

func (m2 mockMetrics) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) {
}

func (m2 mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) {
}

Expand Down
18 changes: 13 additions & 5 deletions service/domain/relays/relay_connections.go
Expand Up @@ -12,6 +12,7 @@ import (
type Metrics interface {
ReportRelayConnectionsState(m map[domain.RelayAddress]RelayConnectionState)
ReportNumberOfSubscriptions(address domain.RelayAddress, n int)
ReportRateLimitBackoffMs(address domain.RelayAddress, n int)
ReportMessageReceived(address domain.RelayAddress, messageType MessageType, err *error)
ReportRelayDisconnection(address domain.RelayAddress, err error)
ReportNotice(address domain.RelayAddress, noticeType NoticeType)
Expand Down Expand Up @@ -91,10 +92,21 @@ func (d *RelayConnections) storeMetrics() {

m := make(map[domain.RelayAddress]RelayConnectionState)
for _, connection := range d.connections {
rateLimitNoticeBackoffManager := d.getRateLimitNoticeBackoffManager(connection.Address())
d.metrics.ReportRateLimitBackoffMs(connection.Address(), rateLimitNoticeBackoffManager.GetBackoffMs())
m[connection.Address()] = connection.State()
}
d.metrics.ReportRelayConnectionsState(m)
}
func (r *RelayConnections) getRateLimitNoticeBackoffManager(relayAddress domain.RelayAddress) *RateLimitNoticeBackoffManager {
rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()]
if !exists {
rateLimitNoticeBackoffManager = NewRateLimitNoticeBackoffManager()
r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] = rateLimitNoticeBackoffManager
}

return rateLimitNoticeBackoffManager
}

// Notice that a single connection can serve multiple req. This can cause a too many concurrent requests error if not throttled.
func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *RelayConnection {
Expand All @@ -110,11 +122,7 @@ func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *Rela
// Sometimes different addreses can point to the same relay. Example is
// wss://feeds.nostr.band/video and wss://feeds.nostr.band/audio. For these
// cases, we want to share the rate limit notice backoff manager.
rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()]
if !exists {
rateLimitNoticeBackoffManager = NewRateLimitNoticeBackoffManager()
r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] = rateLimitNoticeBackoffManager
}
rateLimitNoticeBackoffManager := r.getRateLimitNoticeBackoffManager(relayAddress)

connection := NewRelayConnection(factory, rateLimitNoticeBackoffManager, r.logger, r.metrics)
go connection.Run(r.longCtx)
Expand Down

0 comments on commit ff78308

Please sign in to comment.