Skip to content

Commit

Permalink
Merge pull request #81 from planetary-social/shared-rate-limit-manager
Browse files Browse the repository at this point in the history
Shared rate limiter for multiple conns
  • Loading branch information
dcadenas committed Feb 14, 2024
2 parents fc26bb4 + 6908f7b commit 6281d72
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 80 deletions.
29 changes: 14 additions & 15 deletions service/domain/relay_address.go
Expand Up @@ -9,7 +9,8 @@ import (
)

type RelayAddress struct {
original string
original string
hostWithoutPort string
}

func NewRelayAddress(s string) (RelayAddress, error) {
Expand All @@ -25,8 +26,16 @@ func NewRelayAddress(s string) (RelayAddress, error) {
return RelayAddress{}, errors.New("invalid protocol")
}

u.Host = strings.ToLower(u.Host)
hostWithoutPort, _, err := net.SplitHostPort(u.Host)
if err != nil {
hostWithoutPort = u.Host
}
normalizedURI := u.String()

return RelayAddress{
original: s,
original: normalizedURI,
hostWithoutPort: hostWithoutPort,
}, nil
}

Expand All @@ -43,22 +52,12 @@ func NewRelayAddressFromMaybeAddress(maybe MaybeRelayAddress) (RelayAddress, err
}

func (r RelayAddress) IsLoopbackOrPrivate() bool {
hostWithoutPort := r.getHostWithoutPort()
ip := net.ParseIP(hostWithoutPort)
ip := net.ParseIP(r.hostWithoutPort)
return ip.IsLoopback() || ip.IsPrivate()
}

func (r RelayAddress) getHostWithoutPort() string {
u, err := url.Parse(r.original)
if err != nil {
panic(err) // checked in constructor
}

hostWithoutPort, _, err := net.SplitHostPort(u.Host)
if err != nil {
return u.Host
}
return hostWithoutPort
func (r RelayAddress) HostWithoutPort() string {
return r.hostWithoutPort
}

func (r RelayAddress) String() string {
Expand Down
6 changes: 5 additions & 1 deletion service/domain/relay_address_test.go
Expand Up @@ -43,9 +43,13 @@ func TestRelayAddress(t *testing.T) {
Input: "wss://example.com/ ",
Output: "wss://example.com",
},
{
Input: "wss://EXAMPLE.com/FooBar ",
Output: "wss://example.com/FooBar",
},
{
Input: "wss://example1.com/ wss://example2.com",
Output: "wss://example1.com/ wss://example2.com",
Output: "wss://example1.com/%20wss://example2.com",
},
{
Input: "wss:// wss://example.com",
Expand Down
69 changes: 69 additions & 0 deletions service/domain/relays/rate_limit_notice_backoff_manager.go
@@ -0,0 +1,69 @@
package relays

import (
"math"
"sync/atomic"
"time"
)

type RateLimitNoticeBackoffManager struct {
rateLimitNoticeCount int32
lastBumpTime atomic.Value
}

func NewRateLimitNoticeBackoffManager() *RateLimitNoticeBackoffManager {
r := &RateLimitNoticeBackoffManager{
rateLimitNoticeCount: 0,
}

r.updateLastBumpTime()
return r
}

func (r *RateLimitNoticeBackoffManager) Bump() {
timeSinceLastBump := time.Since(r.getLastBumpTime())
if timeSinceLastBump < 500*time.Millisecond {
// Give some time for the rate limit to be lifted before increasing the counter
return
}

atomic.AddInt32(&r.rateLimitNoticeCount, 1)
r.updateLastBumpTime()
}

const maxBackoffMs = 10000
const secondsToDecreaseRateLimitNoticeCount = 60 * 5 // 5 minutes = 300 seconds

func (r *RateLimitNoticeBackoffManager) Wait() {
rateLimitNoticeCount := atomic.LoadInt32(&r.rateLimitNoticeCount)
if rateLimitNoticeCount <= 0 {
return
}

backoffMs := int(math.Min(float64(maxBackoffMs), math.Pow(2, float64(r.rateLimitNoticeCount))*50))

timeSinceLastBump := time.Since(r.getLastBumpTime())
if timeSinceLastBump > secondsToDecreaseRateLimitNoticeCount*time.Second {
atomic.AddInt32(&r.rateLimitNoticeCount, -1)
r.updateLastBumpTime()
}

if backoffMs > 0 {
time.Sleep(time.Duration(backoffMs) * time.Millisecond)
}
}

func (r *RateLimitNoticeBackoffManager) updateLastBumpTime() time.Time {
t := time.Now()
r.lastBumpTime.Store(t)
return t
}

func (r *RateLimitNoticeBackoffManager) getLastBumpTime() time.Time {
val := r.lastBumpTime.Load()
if t, ok := val.(time.Time); ok {
return t
}

return r.updateLastBumpTime()
}
57 changes: 2 additions & 55 deletions service/domain/relays/relay_connection.go
Expand Up @@ -8,7 +8,6 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/boreq/errors"
Expand Down Expand Up @@ -54,59 +53,6 @@ type ConnectionFactory interface {
Address() domain.RelayAddress
}

type RateLimitNoticeBackoffManager struct {
address domain.RelayAddress
rateLimitNoticeCount int32
lastBumpTime atomic.Value // Use atomic.Value for time.Time
}

func (r *RateLimitNoticeBackoffManager) updateLastBumpTime() {
r.lastBumpTime.Store(time.Now())
}

func (r *RateLimitNoticeBackoffManager) getLastBumpTime() time.Time {
return r.lastBumpTime.Load().(time.Time)
}

func NewRateLimitNoticeBackoffManager(address domain.RelayAddress) *RateLimitNoticeBackoffManager {
r := &RateLimitNoticeBackoffManager{
address: address,
rateLimitNoticeCount: 0,
}

r.updateLastBumpTime()
return r
}

func (r *RateLimitNoticeBackoffManager) Bump() {
timeSinceLastBump := time.Since(r.getLastBumpTime())
if timeSinceLastBump < 500*time.Millisecond {
// Give some time for the rate limit to be lifted before increasing the counter
return
}

atomic.AddInt32(&r.rateLimitNoticeCount, 1)
r.updateLastBumpTime()
}

func (r *RateLimitNoticeBackoffManager) Wait() {
if r.rateLimitNoticeCount <= 0 {
return
}

backoffMs := int(math.Pow(2, float64(r.rateLimitNoticeCount))) * 100

timeSinceLastBump := time.Since(r.getLastBumpTime())
if timeSinceLastBump > 5*time.Second {
atomic.AddInt32(&r.rateLimitNoticeCount, -1)
r.updateLastBumpTime()
}

if backoffMs > 0 {
time.Sleep(time.Duration(backoffMs) * time.Millisecond)
}
}

type RelayConnection struct {
connectionFactory ConnectionFactory
logger logging.Logger
Expand All @@ -128,6 +74,7 @@ type RelayConnection struct {

func NewRelayConnection(
connectionFactory ConnectionFactory,
rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager,
logger logging.Logger,
metrics Metrics,
) *RelayConnection {
Expand All @@ -141,7 +88,7 @@ func NewRelayConnection(
subscriptionsUpdatedCh: make(chan struct{}),
eventsToSend: make(map[domain.EventId]*eventToSend),
newEventsCh: make(chan domain.Event),
rateLimitNoticeBackoffManager: NewRateLimitNoticeBackoffManager(connectionFactory.Address()),
rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager,
}
}

Expand Down
3 changes: 2 additions & 1 deletion service/domain/relays/relay_connection_test.go
Expand Up @@ -135,9 +135,10 @@ type testConnection struct {
func newTestConnection(tb testing.TB, ctx context.Context) *testConnection {
connection := newMockConnection()
factory := newMockConnectionFactory(connection)
backoffManager := relays.NewRateLimitNoticeBackoffManager()
metrics := newMockMetrics()
logger := logging.NewDevNullLogger()
relayConnection := relays.NewRelayConnection(factory, logger, metrics)
relayConnection := relays.NewRelayConnection(factory, backoffManager, logger, metrics)
go relayConnection.Run(ctx)

return &testConnection{
Expand Down
28 changes: 20 additions & 8 deletions service/domain/relays/relay_connections.go
Expand Up @@ -45,17 +45,19 @@ type RelayConnections struct {

longCtx context.Context

connections map[domain.RelayAddress]*RelayConnection
connectionsLock sync.Mutex
connections map[domain.RelayAddress]*RelayConnection
rateLimitNoticeBackoffManagers map[string]*RateLimitNoticeBackoffManager
connectionsLock sync.Mutex
}

func NewRelayConnections(ctx context.Context, logger logging.Logger, metrics Metrics) *RelayConnections {
v := &RelayConnections{
logger: logger.New("relayConnections"),
metrics: metrics,
longCtx: ctx,
connections: make(map[domain.RelayAddress]*RelayConnection),
connectionsLock: sync.Mutex{},
logger: logger.New("relayConnections"),
metrics: metrics,
longCtx: ctx,
connections: make(map[domain.RelayAddress]*RelayConnection),
rateLimitNoticeBackoffManagers: make(map[string]*RateLimitNoticeBackoffManager),
connectionsLock: sync.Mutex{},
}
go v.storeMetricsLoop(ctx)
return v
Expand Down Expand Up @@ -104,7 +106,17 @@ func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *Rela
}

factory := NewWebsocketConnectionFactory(relayAddress, r.logger)
connection := NewRelayConnection(factory, r.logger, r.metrics)

// Sometimes different addreses can point to the same relay. Example is
// wss://feeds.nostr.band/video and wss://feeds.nostr.band/audio. For these
// cases, we want to share the rate limit notice backoff manager.
rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()]
if !exists {
rateLimitNoticeBackoffManager = NewRateLimitNoticeBackoffManager()
r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] = rateLimitNoticeBackoffManager
}

connection := NewRelayConnection(factory, rateLimitNoticeBackoffManager, r.logger, r.metrics)
go connection.Run(r.longCtx)

r.connections[relayAddress] = connection
Expand Down

0 comments on commit 6281d72

Please sign in to comment.