Skip to content

Commit

Permalink
Add in utility to detect and delete any NRG orphans.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 12, 2023
1 parent 01aab6d commit 9531611
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -1167,6 +1167,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1197,6 +1256,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1277,7 +1338,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1288,6 +1348,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2037,6 +2099,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
50 changes: 49 additions & 1 deletion server/jetstream_cluster_3_test.go
Expand Up @@ -5555,7 +5555,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down Expand Up @@ -5634,3 +5634,51 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}

func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// We will force an orphan for a certain server.
s := c.randomNonStreamLeader(globalAccountName, "TEST")

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
sgn := mset.raftNode().Group()
mset.clearRaftNode()

o := mset.lookupConsumer("DC")
require_True(t, o != nil)
ogn := o.raftNode().Group()
o.clearRaftNode()

require_NoError(t, js.DeleteStream("TEST"))

// Check that we do in fact have orphans.
require_True(t, s.numRaftNodes() > 1)

// This function will detect orphans and clean them up.
s.checkForNRGOrphans()

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
3 changes: 2 additions & 1 deletion server/monitor.go
Expand Up @@ -3225,7 +3225,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
}
details := opts.Details
defer func() {
// for response with details enabled, ses status to either "error" or "ok"
// for response with details enabled, set status to either "error" or "ok"
if details {
if len(health.Errors) != 0 {
health.Status = "error"
Expand Down Expand Up @@ -3492,6 +3492,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
var streams map[string]map[string]*streamAssignment
js.mu.RLock()
if opts.Account == _EMPTY_ {
// Collect all relevant streams and consumers.
streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
Expand Down

0 comments on commit 9531611

Please sign in to comment.