Skip to content

Commit

Permalink
Changes to hunt timeout issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Mar 14, 2024
1 parent 16d31bc commit f0140a4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
3 changes: 2 additions & 1 deletion service/app/handler_process_saved_event.go
Expand Up @@ -26,7 +26,8 @@ var (
)
)

const sendEventToRelayTimeout = 6 * time.Second
// const sendEventToRelayTimeout = 6 * time.Second
const sendEventToRelayTimeout = 30 * time.Second

type ProcessSavedEvent struct {
id domain.EventId
Expand Down
3 changes: 3 additions & 0 deletions service/domain/downloader/downloader.go
Expand Up @@ -178,6 +178,9 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error {
return errors.Wrap(err, "error getting relays")
}

// Test if removing the relay for reads solves the timeout issue
relays.Delete(domain.MustNewRelayAddress("wss://relay.nos.social"))

d.relayDownloadersLock.Lock()
defer d.relayDownloadersLock.Unlock()

Expand Down
21 changes: 19 additions & 2 deletions service/domain/relays/relay_connection.go
Expand Up @@ -101,10 +101,17 @@ func (r *RelayConnection) Run(ctx context.Context) {

r.metrics.ReportRelayDisconnection(r.connectionFactory.Address(), err)

backoff := r.backoffManager.GetReconnectionBackoff(err)

// We control relay.nos.social, so we can be more aggressive with the backoff
if r.Address().String() == "wss://relay.nos.social" {
backoff = 1 * time.Minute
}

select {
case <-ctx.Done():
return
case <-time.After(r.backoffManager.GetReconnectionBackoff(err)):
case <-time.After(backoff):
continue
}
}
Expand Down Expand Up @@ -150,7 +157,16 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) (
return ch, nil
}

// pushes the event to the eventsToSend channel and blocks until a sendEventResponse is received
// SendEvent schedules an event to be sent to the relay. It does so by creating
// and adding an eventToSendRequest to the `eventsToSend` map, which manages
// pending requests for each event. This approach prevents multiple requests for
// sending the same event from being initiated. Each eventToSendRequest includes
// a `ch` channel, which receives a sendEventResponse indicating the success or
// failure of the send operation. This success signal is send to the ch channel
// through the `passSendEventResponseToChannel` function after an OK message is
// received. Additionally, the event is enqueued into `newEventsCh`, a channel
// that is monitored by a loop within `sendEvents`, to manage the actual
// transmission of the event to the relay.
func (r *RelayConnection) SendEvent(ctx context.Context, event domain.Event) error {
ctx, cancel := context.WithTimeout(ctx, sendEventTimeout)
defer cancel()
Expand Down Expand Up @@ -303,6 +319,7 @@ func (r *RelayConnection) run(ctx context.Context) error {
}()

go func() {
// Loops through newEventsCh and sends events to the relay
if err := r.sendEvents(ctx, conn); err != nil {
if !r.writeErrorShouldNotBeLogged(err) {
r.logger.Error().
Expand Down

0 comments on commit f0140a4

Please sign in to comment.