Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] System stability improvements on restarts with corrupt metadata. #3934

Merged
merged 4 commits into from Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 57 additions & 10 deletions server/jetstream_cluster.go
Expand Up @@ -956,6 +956,42 @@ type recoveryUpdates struct {
updateConsumers map[string]*consumerAssignment
}

// Called after recovery of the cluster on startup to check for any orphans.
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
js.mu.Lock()
defer js.mu.Unlock()

consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

for accName, jsa := range js.accounts {
asa := cc.streams[accName]
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
mset.delete()
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
o.delete()
}
}
}
}
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() {
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
Expand Down Expand Up @@ -1050,6 +1086,7 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
continue
}
// FIXME(dlc) - Deal with errors.
Expand All @@ -1070,9 +1107,9 @@ func (js *jetStream) monitorCluster() {

if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
// Optionally install a snapshot as we become leader.
doSnapshot()
// Install a snapshot as we become leader.
js.checkClusterSize()
n.InstallSnapshot(js.metaSnapshot())
}

case <-t.C:
Expand Down Expand Up @@ -1274,7 +1311,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
// Do removals first.
for _, sa := range saDel {
js.setStreamAssignmentRecovering(sa)

if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
Expand Down Expand Up @@ -1896,7 +1932,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
Expand Down Expand Up @@ -2987,6 +3023,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss

mset, err := acc.lookupStream(cfg.Name)
if err == nil && mset != nil {
// Make sure we have not had a new group assigned to us.
if osa.Group.Name != sa.Group.Name {
s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q",
acc, cfg.Name, osa.Group.Name, sa.Group.Name)
mset.removeNode()
alreadyRunning, needsNode = false, true
// Make sure to clear from original.
js.mu.Lock()
osa.Group.node = nil
js.mu.Unlock()
}

var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
Expand Down Expand Up @@ -4062,7 +4110,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
Expand Down Expand Up @@ -7607,7 +7655,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _
func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
stype := mset.cfg.Storage
isLeader := mset.isLeader()
mset.mu.RUnlock()

Expand All @@ -7617,9 +7664,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
return
}

// If we are here we are in a compromised state due to server limits let someone else answer if they can.
if !isLeader && js.limitsExceeded(stype) {
time.Sleep(100 * time.Millisecond)
// If we are not the leader let someone else possible respond first.
if !isLeader {
time.Sleep(200 * time.Millisecond)
}

si := &StreamInfo{
Expand Down
46 changes: 26 additions & 20 deletions server/raft.go
Expand Up @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) {
n.Unlock()
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
Expand Down Expand Up @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.Unlock()
return
}
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex)
}
}

Expand Down Expand Up @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() {
// if we observe a bigger term, we should start over again or risk forming a quorum fully knowing
// someone with a better term exists. This is even the right thing to do if won == true.
n.Lock()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.stepdown.push(noLeader)
n.lxfer = false
n.Unlock()
return
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
// Make sure to reset commit and applied if above
if n.commit > n.pindex {
n.commit = n.pindex
}
if n.applied > n.commit {
n.applied = n.commit
}
}()

if err := n.wal.Truncate(index); err != nil {
Expand All @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) {

// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index

}

// Reset our WAL.
// Lock should be held.
func (n *raft) resetWAL() {
n.truncateWAL(0, 0)
}
Expand All @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) {
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()

// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
Expand Down Expand Up @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse

var success bool
eae, err := n.loadEntry(ae.pindex)
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
if eae != nil && ae.pterm > eae.pterm || err != nil {
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
if eae != nil {
n.truncateWAL(ae.pterm, ae.pindex)
}
// Make sure to cancel any catchups in progress.
if catchingUp {
n.cancelCatchup()
}
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
n.resetWAL()
} else {
// Inherit regardless.
n.pterm = ae.pterm
success = true
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(ae.pterm, ae.pindex)
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_}
n.Unlock()
Expand Down Expand Up @@ -3560,10 +3561,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

// Only way we get to yes is through here.
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
} else {
if vr.term >= n.term && n.vote == noVote {
n.term = vr.term
n.resetElect(randCampaignTimeout())
}
}
n.Unlock()

Expand Down