Skip to content

Commit

Permalink
[IMPROVED] Optimize locking for consumer info API (#4615)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 2, 2023
2 parents 4165f86 + e4ca15c commit e42b8ce
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions server/jetstream_api.go
Expand Up @@ -4164,9 +4164,20 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
js.mu.RLock()
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
ourID := cc.meta.ID()
var offline bool
var rg *raftGroup
var offline, isMember bool
if ca != nil {
offline = s.allPeersOffline(ca.Group)
if rg = ca.Group; rg != nil {
offline = s.allPeersOffline(rg)
isMember = rg.isMember(ourID)
}
}
// Capture consumer leader here.
isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
// Also capture if we think there is no meta leader.
var isLeaderLess bool
if !isLeader {
isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault
}
js.mu.RUnlock()

Expand All @@ -4189,7 +4200,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if ca == nil {
if js.isLeaderless() {
if isLeaderLess {
resp.Error = NewJSClusterNotAvailError()
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
Expand All @@ -4202,38 +4213,35 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}

// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(ca.Group) {
if isMember && js.isGroupLeaderless(ca.Group) {
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// We have the consumer assigned and a leader, so only the consumer leader should answer.
if !acc.JetStreamIsConsumerLeader(streamName, consumerName) {
if js.isLeaderless() {
if !isConsumerLeader {
if isLeaderLess {
resp.Error = NewJSClusterNotAvailError()
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group)
return
}

var node RaftNode
var leaderNotPartOfGroup bool

// We have a consumer assignment.
js.mu.RLock()
var (
node RaftNode
leaderNotPartOfGroup bool
isMember bool
)
rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
if isMember {
js.mu.RLock()
if rg.node != nil {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
}
}
js.mu.RUnlock()
}
js.mu.RUnlock()

// Check if we should ignore all together.
if node == nil {
Expand Down

0 comments on commit e42b8ce

Please sign in to comment.