Skip to content

Commit

Permalink
Add in utility to detect and delete any NRG orphans. (#4510)
Browse files Browse the repository at this point in the history
We do not expect this condition at all but did see it in the wild, so
this will detect and cleanup. Will run on same timer (2m) as general
health checks run from `monitorCluster()`

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 12, 2023
2 parents 01aab6d + 9531611 commit e378f04
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -1167,6 +1167,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1197,6 +1256,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1277,7 +1338,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1288,6 +1348,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2037,6 +2099,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
50 changes: 49 additions & 1 deletion server/jetstream_cluster_3_test.go
Expand Up @@ -5555,7 +5555,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down Expand Up @@ -5634,3 +5634,51 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}

func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// We will force an orphan for a certain server.
s := c.randomNonStreamLeader(globalAccountName, "TEST")

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
sgn := mset.raftNode().Group()
mset.clearRaftNode()

o := mset.lookupConsumer("DC")
require_True(t, o != nil)
ogn := o.raftNode().Group()
o.clearRaftNode()

require_NoError(t, js.DeleteStream("TEST"))

// Check that we do in fact have orphans.
require_True(t, s.numRaftNodes() > 1)

// This function will detect orphans and clean them up.
s.checkForNRGOrphans()

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
3 changes: 2 additions & 1 deletion server/monitor.go
Expand Up @@ -3225,7 +3225,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
}
details := opts.Details
defer func() {
// for response with details enabled, ses status to either "error" or "ok"
// for response with details enabled, set status to either "error" or "ok"
if details {
if len(health.Errors) != 0 {
health.Status = "error"
Expand Down Expand Up @@ -3492,6 +3492,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
var streams map[string]map[string]*streamAssignment
js.mu.RLock()
if opts.Account == _EMPTY_ {
// Collect all relevant streams and consumers.
streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
Expand Down

0 comments on commit e378f04

Please sign in to comment.