Skip to content

Commit

Permalink
Merge pull request #3934 from nats-io/corrupt-meta
Browse files Browse the repository at this point in the history
[FIXED] System stability improvements on restarts with corrupt metadata.
  • Loading branch information
derekcollison committed Mar 2, 2023
2 parents 4e3b983 + 2beca1a commit 2d76c61
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 45 deletions.
97 changes: 73 additions & 24 deletions server/jetstream_cluster.go
Expand Up @@ -956,6 +956,42 @@ type recoveryUpdates struct {
updateConsumers map[string]*consumerAssignment
}

// Called after recovery of the cluster on startup to check for any orphans.
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
js.mu.Lock()
defer js.mu.Unlock()

consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

for accName, jsa := range js.accounts {
asa := cc.streams[accName]
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
mset.delete()
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
o.delete()
}
}
}
}
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() {
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
Expand All @@ -1022,7 +1058,7 @@ func (js *jetStream) monitorCluster() {
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
n.InstallSnapshot(js.metaSnapshot())
doSnapshot()
// Return the signal back since shutdown will be waiting.
close(qch)
return
Expand Down Expand Up @@ -1050,15 +1086,18 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
} else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 {
doSnapshot()
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
Expand All @@ -1070,9 +1109,9 @@ func (js *jetStream) monitorCluster() {

if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
// Optionally install a snapshot as we become leader.
doSnapshot()
// Install a snapshot as we become leader.
js.checkClusterSize()
doSnapshot()
}

case <-t.C:
Expand Down Expand Up @@ -1274,7 +1313,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
// Do removals first.
for _, sa := range saDel {
js.setStreamAssignmentRecovering(sa)

if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
Expand Down Expand Up @@ -1539,8 +1577,8 @@ func (ca *consumerAssignment) recoveryKey() string {
return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name
}

func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, error) {
var didSnap, didRemove bool
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) {
var didSnap, didRemoveStream, didRemoveConsumer bool
isRecovering := js.isMetaRecovering()

for _, e := range entries {
Expand All @@ -1562,20 +1600,20 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
delete(ru.removeStreams, sa.recoveryKey())
}
if js.processStreamAssignment(sa) {
didRemove = true
didRemoveStream = true
}
case removeStreamOp:
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -1584,13 +1622,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.updateStreams, key)
} else {
js.processStreamRemoval(sa)
didRemove = true
didRemoveStream = true
}
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1604,7 +1642,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1618,7 +1656,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1627,13 +1665,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.updateConsumers, key)
} else {
js.processConsumerRemoval(ca)
didRemove = true
didRemoveConsumer = true
}
case updateStreamOp:
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -1648,7 +1686,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
}
}
return didSnap, didRemove, nil
return didSnap, didRemoveStream, didRemoveConsumer, nil
}

func (rg *raftGroup) isMember(id string) bool {
Expand Down Expand Up @@ -1896,7 +1934,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
Expand Down Expand Up @@ -2987,6 +3025,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss

mset, err := acc.lookupStream(cfg.Name)
if err == nil && mset != nil {
// Make sure we have not had a new group assigned to us.
if osa.Group.Name != sa.Group.Name {
s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q",
acc, cfg.Name, osa.Group.Name, sa.Group.Name)
mset.removeNode()
alreadyRunning, needsNode = false, true
// Make sure to clear from original.
js.mu.Lock()
osa.Group.node = nil
js.mu.Unlock()
}

var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
Expand Down Expand Up @@ -4062,7 +4112,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
Expand Down Expand Up @@ -7607,7 +7657,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _
func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
stype := mset.cfg.Storage
isLeader := mset.isLeader()
mset.mu.RUnlock()

Expand All @@ -7617,9 +7666,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
return
}

// If we are here we are in a compromised state due to server limits let someone else answer if they can.
if !isLeader && js.limitsExceeded(stype) {
time.Sleep(100 * time.Millisecond)
// If we are not the leader let someone else possible respond first.
if !isLeader {
time.Sleep(200 * time.Millisecond)
}

si := &StreamInfo{
Expand Down
53 changes: 32 additions & 21 deletions server/raft.go
Expand Up @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) {
n.Unlock()
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
Expand Down Expand Up @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.Unlock()
return
}
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex)
}
}

Expand Down Expand Up @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() {
// if we observe a bigger term, we should start over again or risk forming a quorum fully knowing
// someone with a better term exists. This is even the right thing to do if won == true.
n.Lock()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.stepdown.push(noLeader)
n.lxfer = false
n.Unlock()
return
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
// Make sure to reset commit and applied if above
if n.commit > n.pindex {
n.commit = n.pindex
}
if n.applied > n.commit {
n.applied = n.commit
}
}()

if err := n.wal.Truncate(index); err != nil {
Expand All @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) {

// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index

}

// Reset our WAL.
// Lock should be held.
func (n *raft) resetWAL() {
n.truncateWAL(0, 0)
}
Expand All @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) {
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()

// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
Expand Down Expand Up @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse

var success bool
eae, err := n.loadEntry(ae.pindex)
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
if eae != nil && ae.pterm > eae.pterm || err != nil {
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
if eae != nil {
n.truncateWAL(ae.pterm, ae.pindex)
}
// Make sure to cancel any catchups in progress.
if catchingUp {
n.cancelCatchup()
}
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
n.resetWAL()
} else {
// Inherit regardless.
n.pterm = ae.pterm
success = true
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(ae.pterm, ae.pindex)
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_}
n.Unlock()
Expand Down Expand Up @@ -3384,7 +3385,12 @@ func (n *raft) setWriteErrLocked(err error) {
}
// Ignore non-write errors.
if err != nil {
if err == ErrStoreClosed || err == ErrStoreEOF || err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending {
if err == ErrStoreClosed ||
err == ErrStoreEOF ||
err == ErrInvalidSequence ||
err == ErrStoreMsgNotFound ||
err == errNoPending ||
err == errPartialCache {
return
}
// If this is a not found report but do not disable.
Expand Down Expand Up @@ -3560,10 +3566,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

// Only way we get to yes is through here.
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
} else {
if vr.term >= n.term && n.vote == noVote {
n.term = vr.term
n.resetElect(randCampaignTimeout())
}
}
n.Unlock()

Expand Down

0 comments on commit 2d76c61

Please sign in to comment.