Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in utility to detect and delete any NRG orphans. #4510

Merged
merged 1 commit into from Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -1167,6 +1167,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1197,6 +1256,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1277,7 +1338,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1288,6 +1348,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2037,6 +2099,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
50 changes: 49 additions & 1 deletion server/jetstream_cluster_3_test.go
Expand Up @@ -5555,7 +5555,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down Expand Up @@ -5634,3 +5634,51 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}

func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// We will force an orphan for a certain server.
s := c.randomNonStreamLeader(globalAccountName, "TEST")

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
sgn := mset.raftNode().Group()
mset.clearRaftNode()

o := mset.lookupConsumer("DC")
require_True(t, o != nil)
ogn := o.raftNode().Group()
o.clearRaftNode()

require_NoError(t, js.DeleteStream("TEST"))

// Check that we do in fact have orphans.
require_True(t, s.numRaftNodes() > 1)

// This function will detect orphans and clean them up.
s.checkForNRGOrphans()

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
3 changes: 2 additions & 1 deletion server/monitor.go
Expand Up @@ -3225,7 +3225,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
}
details := opts.Details
defer func() {
// for response with details enabled, ses status to either "error" or "ok"
// for response with details enabled, set status to either "error" or "ok"
if details {
if len(health.Errors) != 0 {
health.Status = "error"
Expand Down Expand Up @@ -3492,6 +3492,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
var streams map[string]map[string]*streamAssignment
js.mu.RLock()
if opts.Account == _EMPTY_ {
// Collect all relevant streams and consumers.
streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
Expand Down