Skip to content

Commit

Permalink
A simpler stream state to detect change for snapshots. (#4074)
Browse files Browse the repository at this point in the history
A stream could have a complicated state with interior deletes.
This is a simpler way to determine if we need to consider a snapshot
that involves much less time and CPU and memory.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 19, 2023
2 parents c43c216 + f6195a5 commit e96ae0b
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions server/jetstream_cluster.go
Expand Up @@ -1968,33 +1968,37 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
accName := acc.GetName()

// Hash of the last snapshot (fixed size in memory).
var lastSnap []byte
// Used to represent how we can detect a changed state quickly and without representing
// a complete and detailed state which could be costly in terms of memory, cpu and GC.
// This only entails how many messages, and the first and last sequence of the stream.
// This is all that is needed to detect a change, and we can get this from FilteredState()
// with and empty filter.
var lastState SimpleState
var lastSnapTime time.Time

// Highwayhash key for generating hashes.
key := make([]byte, 32)
rand.Read(key)

// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
}

snap := mset.stateSnapshot()
ne, nb := n.Size()
hash := highwayhash.Sum(snap, key)
// Before we actually calculate the detailed state and encode it, let's check the
// simple state to detect any changes.
curState := mset.store.FilteredState(0, _EMPTY_)

// If the state hasn't changed but the log has gone way over
// the compaction size then we will want to compact anyway.
// This shouldn't happen for streams like it can for pull
// consumers on idle streams but better to be safe than sorry!
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
ne, nb := n.Size()
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
return
}

if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}

Expand Down

0 comments on commit e96ae0b

Please sign in to comment.