Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] System stability improvements on restarts with corrupt metadata. #3934

Merged
merged 4 commits into from Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 73 additions & 24 deletions server/jetstream_cluster.go
Expand Up @@ -956,6 +956,42 @@ type recoveryUpdates struct {
updateConsumers map[string]*consumerAssignment
}

// Called after recovery of the cluster on startup to check for any orphans.
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
js.mu.Lock()
defer js.mu.Unlock()

consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

for accName, jsa := range js.accounts {
asa := cc.streams[accName]
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
mset.delete()
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
o.delete()
}
}
}
}
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() {
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
Expand All @@ -1022,7 +1058,7 @@ func (js *jetStream) monitorCluster() {
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
n.InstallSnapshot(js.metaSnapshot())
doSnapshot()
// Return the signal back since shutdown will be waiting.
close(qch)
return
Expand Down Expand Up @@ -1050,15 +1086,18 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
} else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 {
doSnapshot()
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
Expand All @@ -1070,9 +1109,9 @@ func (js *jetStream) monitorCluster() {

if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
// Optionally install a snapshot as we become leader.
doSnapshot()
// Install a snapshot as we become leader.
js.checkClusterSize()
doSnapshot()
}

case <-t.C:
Expand Down Expand Up @@ -1274,7 +1313,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
// Do removals first.
for _, sa := range saDel {
js.setStreamAssignmentRecovering(sa)

if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
Expand Down Expand Up @@ -1539,8 +1577,8 @@ func (ca *consumerAssignment) recoveryKey() string {
return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name
}

func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, error) {
var didSnap, didRemove bool
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) {
var didSnap, didRemoveStream, didRemoveConsumer bool
isRecovering := js.isMetaRecovering()

for _, e := range entries {
Expand All @@ -1562,20 +1600,20 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
delete(ru.removeStreams, sa.recoveryKey())
}
if js.processStreamAssignment(sa) {
didRemove = true
didRemoveStream = true
}
case removeStreamOp:
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -1584,13 +1622,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.updateStreams, key)
} else {
js.processStreamRemoval(sa)
didRemove = true
didRemoveStream = true
}
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1604,7 +1642,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1618,7 +1656,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -1627,13 +1665,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.updateConsumers, key)
} else {
js.processConsumerRemoval(ca)
didRemove = true
didRemoveConsumer = true
}
case updateStreamOp:
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemove, err
return didSnap, didRemoveStream, didRemoveConsumer, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -1648,7 +1686,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
}
}
return didSnap, didRemove, nil
return didSnap, didRemoveStream, didRemoveConsumer, nil
}

func (rg *raftGroup) isMember(id string) bool {
Expand Down Expand Up @@ -1896,7 +1934,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
Expand Down Expand Up @@ -2987,6 +3025,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss

mset, err := acc.lookupStream(cfg.Name)
if err == nil && mset != nil {
// Make sure we have not had a new group assigned to us.
if osa.Group.Name != sa.Group.Name {
s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q",
acc, cfg.Name, osa.Group.Name, sa.Group.Name)
mset.removeNode()
alreadyRunning, needsNode = false, true
// Make sure to clear from original.
js.mu.Lock()
osa.Group.node = nil
js.mu.Unlock()
}

var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
Expand Down Expand Up @@ -4062,7 +4112,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
Expand Down Expand Up @@ -7607,7 +7657,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _
func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
stype := mset.cfg.Storage
isLeader := mset.isLeader()
mset.mu.RUnlock()

Expand All @@ -7617,9 +7666,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
return
}

// If we are here we are in a compromised state due to server limits let someone else answer if they can.
if !isLeader && js.limitsExceeded(stype) {
time.Sleep(100 * time.Millisecond)
// If we are not the leader let someone else possible respond first.
if !isLeader {
time.Sleep(200 * time.Millisecond)
}

si := &StreamInfo{
Expand Down
53 changes: 32 additions & 21 deletions server/raft.go
Expand Up @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) {
n.Unlock()
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
Expand Down Expand Up @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.Unlock()
return
}
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex)
}
}

Expand Down Expand Up @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() {
// if we observe a bigger term, we should start over again or risk forming a quorum fully knowing
// someone with a better term exists. This is even the right thing to do if won == true.
n.Lock()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.stepdown.push(noLeader)
n.lxfer = false
n.Unlock()
return
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
// Make sure to reset commit and applied if above
if n.commit > n.pindex {
n.commit = n.pindex
}
if n.applied > n.commit {
n.applied = n.commit
}
}()

if err := n.wal.Truncate(index); err != nil {
Expand All @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) {

// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index

}

// Reset our WAL.
// Lock should be held.
func (n *raft) resetWAL() {
n.truncateWAL(0, 0)
}
Expand All @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) {
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()

// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
Expand Down Expand Up @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse

var success bool
eae, err := n.loadEntry(ae.pindex)
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
if eae != nil && ae.pterm > eae.pterm || err != nil {
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
if eae != nil {
n.truncateWAL(ae.pterm, ae.pindex)
}
// Make sure to cancel any catchups in progress.
if catchingUp {
n.cancelCatchup()
}
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
n.resetWAL()
} else {
// Inherit regardless.
n.pterm = ae.pterm
success = true
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(ae.pterm, ae.pindex)
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_}
n.Unlock()
Expand Down Expand Up @@ -3384,7 +3385,12 @@ func (n *raft) setWriteErrLocked(err error) {
}
// Ignore non-write errors.
if err != nil {
if err == ErrStoreClosed || err == ErrStoreEOF || err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending {
if err == ErrStoreClosed ||
err == ErrStoreEOF ||
err == ErrInvalidSequence ||
err == ErrStoreMsgNotFound ||
err == errNoPending ||
err == errPartialCache {
return
}
// If this is a not found report but do not disable.
Expand Down Expand Up @@ -3560,10 +3566,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

// Only way we get to yes is through here.
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
} else {
if vr.term >= n.term && n.vote == noVote {
n.term = vr.term
n.resetElect(randCampaignTimeout())
}
}
n.Unlock()

Expand Down