Skip to content

Commit

Permalink
simplify connectedness events
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed May 2, 2024
1 parent 2681035 commit c68a035
Showing 1 changed file with 33 additions and 71 deletions.
104 changes: 33 additions & 71 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,8 @@ type Swarm struct {

conns struct {
sync.RWMutex
m map[peer.ID][]*Conn
connectednessEventQueue map[peer.ID][]network.Connectedness
lastConnectednessEvent map[peer.ID]network.Connectedness
m map[peer.ID][]*Conn
connectednessEvents chan peer.ID
}

listeners struct {
Expand Down Expand Up @@ -240,8 +239,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
}

s.conns.m = make(map[peer.ID][]*Conn)
s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness)
s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness)
s.conns.connectednessEvents = make(chan peer.ID, 32)
s.listeners.m = make(map[transport.Listener]struct{})
s.transports.m = make(map[int]transport.Transport)
s.notifs.m = make(map[network.Notifiee]struct{})
Expand Down Expand Up @@ -308,17 +306,10 @@ func (s *Swarm) close() {

// Wait for everything to finish.
s.refs.Wait()
close(s.connectednessEventCh)
close(s.conns.connectednessEvents)
<-s.connectednessEmitterDone
s.emitter.Close()

// Remove the connectedness map only after we have closed the connection and sent all the disconnection
// events
s.conns.Lock()
s.conns.connectednessEventQueue = nil
s.conns.lastConnectednessEvent = nil
s.conns.Unlock()

// Now close out any transports (if necessary). Do this after closing
// all connections/listeners.
s.transports.Lock()
Expand Down Expand Up @@ -402,8 +393,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,

c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
s.maybeEnqueueConnectednessUnlocked(p)

// Add two swarm refs:
// * One will be decremented after the close notifications fire in Conn.doClose
// * The other will be decremented when Conn.start exits.
Expand All @@ -414,6 +403,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
c.notifyLk.Lock()
s.conns.Unlock()

// Block this goroutine till this request is enqueued.
// This ensures that there are only a finite number of goroutines that are waiting to send
// the connectedness event on the disconnection side in swarm.removeConn.
// This is so because the goroutine to enqueue disconnection event can only be started
// from either a subscriber or a notifier or after calling c.start
s.conns.connectednessEvents <- p

if !isLimited {
// Notify goroutines waiting for a direct connection
//
Expand Down Expand Up @@ -790,68 +786,34 @@ func (s *Swarm) removeConn(c *Conn) {
}
}
}
s.maybeEnqueueConnectednessUnlocked(p)
s.conns.Unlock()
}

func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness {
events := s.conns.connectednessEventQueue[p]
if len(events) > 0 {
return events[len(events)-1]
}
return s.conns.lastConnectednessEvent[p]
}

func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) {
oldState := s.lastConnectednessEventUnlocked(p)
newState := s.connectednessUnlocked(p)
if oldState != newState {
if s.conns.connectednessEventQueue != nil {
s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState)
select {
case s.connectednessEventCh <- struct{}{}:
default:
}
} else {
log.Errorf("SWARM BUG: nil connectedness map")
}
}
// Do this in a separate go routine to not block the caller.
// This ensures that if a event subscriber closes the connection from the subscription goroutine
// this doesn't deadlock
s.refs.Add(1)
go func() {
defer s.refs.Done()
s.conns.connectednessEvents <- p
}()
}

func (s *Swarm) connectednessEventEmitter() {
defer close(s.connectednessEmitterDone)
for range s.connectednessEventCh {
for {
var c network.Connectedness
var peer peer.ID
s.conns.Lock()
for p, v := range s.conns.connectednessEventQueue {
if len(v) == 0 {
// this shouldn't happen
delete(s.conns.connectednessEventQueue, p)
log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v)
continue
}
c = v[0]
peer = p
s.conns.connectednessEventQueue[p] = v[1:]
if len(s.conns.connectednessEventQueue[p]) == 0 {
delete(s.conns.connectednessEventQueue, p)
}
if c == network.NotConnected {
delete(s.conns.lastConnectednessEvent, p)
} else {
s.conns.lastConnectednessEvent[p] = c
}
break
}
s.conns.Unlock()
if peer == "" {
break
}
lastConnectednessEvents := make(map[peer.ID]network.Connectedness)
for p := range s.conns.connectednessEvents {
s.conns.Lock()
oldState := lastConnectednessEvents[p]
newState := s.connectednessUnlocked(p)
if newState != network.NotConnected {
lastConnectednessEvents[p] = newState
} else {
delete(lastConnectednessEvents, p)
}
s.conns.Unlock()
if newState != oldState {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: c,
Peer: p,
Connectedness: newState,
})
}
}
Expand Down

0 comments on commit c68a035

Please sign in to comment.