Skip to content

Commit

Permalink
[IMPROVED] Finer grained locking for /healthz with large number of Je…
Browse files Browse the repository at this point in the history
…tStream assets. (#4031)

For checking the health of jetstream, do not hold the lock as we
traverse the streams and consumers.

 Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 6, 2023
2 parents 33451e5 + c16915b commit 02122a2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 51 deletions.
46 changes: 11 additions & 35 deletions server/jetstream_cluster.go
Expand Up @@ -436,63 +436,39 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
// Read lock should be held.
func (js *jetStream) isStreamHealthy(account, stream string) bool {
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
js.mu.RLock()
defer js.mu.RUnlock()

cc := js.cluster
if cc == nil {
// Non-clustered mode
return true
}
as := cc.streams[account]
if as == nil {
return false
}
sa := as[stream]
if sa == nil {
return false
}
rg := sa.Group
if rg == nil {
return false
}

if rg.node == nil || rg.node.Healthy() {
if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) {
// Check if we are processing a snapshot and are catching up.
acc, err := cc.s.LookupAccount(account)
if err != nil {
return false
}
mset, err := acc.lookupStream(stream)
if err != nil {
return false
}
if mset.isCatchingUp() {
return false
if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() {
return true
}
// Success.
return true
}

return false
}

// isConsumerCurrent will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
// Read lock should be held.
func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool {
func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool {
js.mu.RLock()
defer js.mu.RUnlock()

cc := js.cluster
if cc == nil {
// Non-clustered mode
return true
}
acc, err := cc.s.LookupAccount(account)
if err != nil {
return false
}
mset, err := acc.lookupStream(stream)
if err != nil {
return false
}
o := mset.lookupConsumer(consumer)
if o == nil {
return false
Expand Down
53 changes: 37 additions & 16 deletions server/monitor.go
@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -3126,33 +3126,54 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// If they are assigned to this server check their status.
ourID := meta.ID()

// TODO(dlc) - Might be better here to not hold the lock the whole time.
// Copy the meta layer so we do not need to hold the js read lock for an extended period of time.
js.mu.RLock()
defer js.mu.RUnlock()

streams := make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// Make sure we can look up
if !js.isStreamHealthy(acc, stream) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream)
return health
}
// Now check consumers.
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
if !js.isConsumerCurrent(acc, stream, consumer) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}
csa.consumers[consumer] = ca.copyGroup()
}
}
nasa[stream] = csa
}
}
streams[acc] = nasa
}
js.mu.RUnlock()

// Use our copy to traverse so we do not need to hold the js lock.
for accName, asa := range streams {
acc, err := s.LookupAccount(accName)
if err != nil && len(asa) > 0 {
health.Status = na
health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
return health
}

for stream, sa := range asa {
// Make sure we can look up
if !js.isStreamHealthy(acc, sa) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream)
return health
}
mset, _ := acc.lookupStream(stream)
// Now check consumers.
for consumer, ca := range sa.consumers {
if !js.isConsumerCurrent(mset, consumer, ca) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}
}
}
}
// Success.
return health
}

0 comments on commit 02122a2

Please sign in to comment.