Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] The meta layer should snapshot if any outstanding entries are present. #4050

Merged
merged 1 commit into from Apr 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 11 additions & 13 deletions server/jetstream_cluster.go
Expand Up @@ -1041,7 +1041,7 @@ func (js *jetStream) monitorCluster() {
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
defer n.Stop()

const compactInterval = 2 * time.Minute
const compactInterval = time.Minute
t := time.NewTicker(compactInterval)
defer t.Stop()

Expand All @@ -1051,10 +1051,10 @@ func (js *jetStream) monitorCluster() {
defer lt.Stop()

var (
isLeader bool
lastSnap []byte
lastSnapTime time.Time
minSnapDelta = 10 * time.Second
isLeader bool
lastSnapTime time.Time
compactSizeMin = uint64(8 * 1024 * 1024) // 8MB
minSnapDelta = 10 * time.Second
)

// Highwayhash key for generating hashes.
Expand All @@ -1070,10 +1070,10 @@ func (js *jetStream) monitorCluster() {
if js.isMetaRecovering() {
return
}
snap := js.metaSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
Expand Down Expand Up @@ -1129,13 +1129,11 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
doSnapshot()
} else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 {
doSnapshot()
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta {
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
ce.ReturnToPool()
Expand Down