Skip to content

Commit

Permalink
Merge pull request #82 from planetary-social/wait-after-server-close
Browse files Browse the repository at this point in the history
Correctly identify CLOSED messages and rate limit on them too
  • Loading branch information
dcadenas committed Feb 14, 2024
2 parents 6281d72 + 30aca35 commit 841cb77
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 29 deletions.
24 changes: 18 additions & 6 deletions internal/logging/log.go
Expand Up @@ -7,6 +7,7 @@ import (
"os/exec"
"reflect"
"runtime"
"sync"
"time"

"github.com/davecgh/go-spew/spew"
Expand Down Expand Up @@ -203,12 +204,23 @@ func (d devNullLoggerEntry) WithField(key string, v any) Entry {
func (d devNullLoggerEntry) Message(msg string) {
}

// logPeriodically executes the passed action function if the current time in milliseconds
// modulo logInterval equals zero. This approach allows executing the action periodically,
// approximating the execution to happen once every `logInterval` milliseconds.
func LogPeriodically(action func(), logInterval int64) {
currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond)
if currentTimeMillis%logInterval == 0 {
// ConfigureTimeThrottler returns a throttled version of the provided action function.
// The returned function will execute the action at most once every specified duration.
// TODO: Maybe this should be moved to a separate package.
func ConfigureTimeThrottler(duration time.Duration) func(action func()) {
var (
lastExecution time.Time
mu sync.Mutex
)
return func(action func()) {
mu.Lock()
defer mu.Unlock()

if time.Since(lastExecution) < duration {
return
}

lastExecution = time.Now()
action()
}
}
Expand Down
8 changes: 6 additions & 2 deletions service/domain/relays/rate_limit_notice_backoff_manager.go
Expand Up @@ -31,12 +31,16 @@ func (r *RateLimitNoticeBackoffManager) Bump() {
r.updateLastBumpTime()
}

func (r *RateLimitNoticeBackoffManager) IsSet() bool {
rateLimitNoticeCount := atomic.LoadInt32(&r.rateLimitNoticeCount)
return rateLimitNoticeCount > 0
}

const maxBackoffMs = 10000
const secondsToDecreaseRateLimitNoticeCount = 60 * 5 // 5 minutes = 300 seconds

func (r *RateLimitNoticeBackoffManager) Wait() {
rateLimitNoticeCount := atomic.LoadInt32(&r.rateLimitNoticeCount)
if rateLimitNoticeCount <= 0 {
if !r.IsSet() {
return
}

Expand Down
91 changes: 70 additions & 21 deletions service/domain/relays/relay_connection.go
Expand Up @@ -333,21 +333,19 @@ func (r *RelayConnection) writeErrorShouldNotBeLogged(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, websocket.ErrCloseSent)
}

func parseMessage(messageBytes []byte) nostr.Envelope {
func parseEnvelope(messageBytes []byte) (nostr.Envelope, bool) {
firstComma := bytes.Index(messageBytes, []byte{','})
if firstComma == -1 {
return nil
return nil, false
}
label := messageBytes[0:firstComma]
containsClose := bytes.Contains(label, []byte("CLOSE"))
if containsClose {
ce := nostr.CloseEnvelope("")
if err := ce.UnmarshalJSON(messageBytes); err == nil {
return &ce
}
containsClosed := bytes.Contains(label, []byte("CLOSED"))
if containsClosed {
return nil, true
}

return nostr.ParseMessage(messageBytes)
nostrEnvelope := nostr.ParseMessage(messageBytes)
return nostrEnvelope, false
}

type NoticeType string
Expand All @@ -364,7 +362,29 @@ const (
)

var unknownFeedRegexp = regexp.MustCompile(`unknown.*feed`)
var rateLimitRegexp = regexp.MustCompile(`rate.*limit|too fast`)
var rateLimitRegexp = regexp.MustCompile(`rate.*limit|too fast|slow.*down`)

type ClosedType string

const (
ClosedTypeRateLimit ClosedType = "RATE_LIMIT"
ClosedTypeAuth ClosedType = "AUTH_REQUIRED"
ClosedTypeUnknown ClosedType = "UNKNOWN"
)

func GetClosedType(content string) ClosedType {
content = strings.ToLower(content)
content = strings.TrimSpace(content)

switch {
case rateLimitRegexp.MatchString(content):
return ClosedTypeRateLimit
case strings.Contains(content, "auth"):
return ClosedTypeAuth
default:
return ClosedTypeUnknown
}
}

func GetNoticeType(content string) NoticeType {
content = strings.ToLower(content)
Expand All @@ -390,12 +410,20 @@ func GetNoticeType(content string) NoticeType {
}
}

var everyTenSeconds func(func()) = logging.ConfigureTimeThrottler(10 * time.Second)

func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) {
address := r.connectionFactory.Address()

envelope := parseMessage(messageBytes)
if envelope == nil {
envelope, isClosed := parseEnvelope(messageBytes)
if envelope == nil && !isClosed {
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)
everyTenSeconds(func() {
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received an unrecognized envelope message")
})
return errors.New("error parsing message, we are never going to find out what error unfortunately due to the design of this library")
}

Expand Down Expand Up @@ -474,16 +502,37 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) {

r.passSendEventResponseToChannel(eventID, response)
return nil
case *nostr.CloseEnvelope:
defer r.metrics.ReportMessageReceived(address, MessageTypeClose, &err)
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received a message (close)")
return nil
default:
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)
return errors.New("unknown message type")
if isClosed {
defer r.metrics.ReportMessageReceived(address, MessageTypeClose, &err)
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received a message (closed)")

closedType := GetClosedType(string(messageBytes))
if closedType == ClosedTypeRateLimit || r.rateLimitNoticeBackoffManager.rateLimitNoticeCount <= 0 {
// Always bump rate limit if it's a rate limit CLOSED message,
// but also do it if there is no current rate limit. just in
// case some relays close from rate limiting without sending a
// notice or updating the CLOSED message.

r.rateLimitNoticeBackoffManager.Bump()
}
return nil
} else {
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)

// Add some traces to investigate these unknown messages
everyTenSeconds(func() {
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received an unknown message")
})
return errors.New("unknown message type")

}
}
}

Expand Down

0 comments on commit 841cb77

Please sign in to comment.