Skip to content

Commit

Permalink
Remove unnecessary return, refactor permission check so that it doesn…
Browse files Browse the repository at this point in the history
…'t hold locks longer than needed

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 13, 2023
1 parent 3fef0ed commit 3b07f43
Showing 1 changed file with 41 additions and 15 deletions.
56 changes: 41 additions & 15 deletions server/mqtt.go
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1875,7 +1876,7 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc
// or 0 if the record was added instead of updated.
//
// Lock not held on entry.
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) uint64 {
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) {
as.mu.Lock()
defer as.mu.Unlock()
if as.retmsgs == nil {
Expand All @@ -1887,10 +1888,9 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai
// If the new sequence is below the floor or the existing one,
// then ignore the new one.
if rm.sseq <= erm.sseq || rm.sseq <= erm.floor {
return 0
return
}
// Capture existing sequence number so we can return it as the old sequence.
oldSeq := erm.sseq
erm.sseq = rm.sseq
// Clear the floor
erm.floor = 0
Expand All @@ -1900,13 +1900,11 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai
erm.sub = &subscription{subject: []byte(key)}
as.sl.Insert(erm.sub)
}
return oldSeq
}
}
rm.sub = &subscription{subject: []byte(key)}
as.retmsgs[key] = rm
as.sl.Insert(rm.sub)
return 0
}

// Removes the retained message for the given `subject` if present, and returns the
Expand Down Expand Up @@ -3256,15 +3254,43 @@ func (s *Server) mqttCheckPubRetainedPerms() {
}
s.mu.Unlock()

// First get a list of all of the sessions.
sm.mu.RLock()
defer sm.mu.RUnlock()

asms := make([]*mqttAccountSessionManager, 0, len(sm.sessions))
for _, asm := range sm.sessions {
asms = append(asms, asm)
}
sm.mu.RUnlock()

type retainedMsg struct {
subj string
rmsg *mqttRetainedMsgRef
}

// For each session we will obtain a list of retained messages.
var _rms [128]retainedMsg
rms := _rms[:0]
for _, asm := range asms {
// Get all of the retained messages. Then we will sort them so
// that they are in sequence order, which should help the file
// store to not have to load out-of-order blocks so often.
asm.mu.Lock()
rms = rms[:0] // reuse slice
for subj, rf := range asm.retmsgs {
rms = append(rms, retainedMsg{
subj: subj,
rmsg: rf,
})
}
asm.mu.Unlock()
sort.Slice(rms, func(i, j int) bool {
return rms[i].rmsg.sseq < rms[j].rmsg.sseq
})

perms := map[string]*perm{}
deletes := map[string]uint64{}
asm.mu.Lock()
for subject, rf := range asm.retmsgs {
jsm, err := asm.jsa.loadMsg(mqttRetainedMsgsStreamName, rf.sseq)
for _, rf := range rms {
jsm, err := asm.jsa.loadMsg(mqttRetainedMsgsStreamName, rf.rmsg.sseq)
if err != nil || jsm == nil {
continue
}
Expand All @@ -3285,20 +3311,20 @@ func (s *Server) mqttCheckPubRetainedPerms() {
}
// If there is permission and no longer allowed to publish in
// the subject, remove the publish retained message from the map.
if p != nil && !pubAllowed(p, subject) {
if p != nil && !pubAllowed(p, rf.subj) {
u = nil
}
}

// Not present or permissions have changed such that the source can't
// publish on that subject anymore: remove it from the map.
if u == nil {
delete(asm.retmsgs, subject)
asm.sl.Remove(rf.sub)
deletes[subject] = rf.sseq
delete(asm.retmsgs, rf.subj)
asm.sl.Remove(rf.rmsg.sub)
deletes[rf.subj] = rf.rmsg.sseq
}
}
asm.mu.Unlock()

for subject, seq := range deletes {
asm.deleteRetainedMsg(seq)
asm.notifyRetainedMsgDeleted(subject, seq)
Expand Down

0 comments on commit 3b07f43

Please sign in to comment.