Skip to content

Commit

Permalink
[FIXED] The meta layer should snapshot if any outstanding entries are…
Browse files Browse the repository at this point in the history
… present. (#4050)

Fixes this test [TestJetStreamClusterDeleteAndRestoreAndRestart] which
would flap since it would not snapshot since hash was same but had
entries that would erase stream data.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 14, 2023
2 parents be4999a + 093564f commit 89f6a5c
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions server/jetstream_cluster.go
Expand Up @@ -1041,7 +1041,7 @@ func (js *jetStream) monitorCluster() {
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
defer n.Stop()

const compactInterval = 2 * time.Minute
const compactInterval = time.Minute
t := time.NewTicker(compactInterval)
defer t.Stop()

Expand All @@ -1051,10 +1051,10 @@ func (js *jetStream) monitorCluster() {
defer lt.Stop()

var (
isLeader bool
lastSnap []byte
lastSnapTime time.Time
minSnapDelta = 10 * time.Second
isLeader bool
lastSnapTime time.Time
compactSizeMin = uint64(8 * 1024 * 1024) // 8MB
minSnapDelta = 10 * time.Second
)

// Highwayhash key for generating hashes.
Expand All @@ -1070,10 +1070,10 @@ func (js *jetStream) monitorCluster() {
if js.isMetaRecovering() {
return
}
snap := js.metaSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
Expand Down Expand Up @@ -1129,13 +1129,11 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
doSnapshot()
} else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 {
doSnapshot()
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta {
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
ce.ReturnToPool()
Expand Down

0 comments on commit 89f6a5c

Please sign in to comment.