Skip to content

Commit

Permalink
[IMPROVEMENT] General stability and bug fixes. (#3999)
Browse files Browse the repository at this point in the history
This PR has general improvements and fixes to filestore, raft, and the
clustering layer.

Summary

1. Additional support for preAck handling for interest based streams
when replicated acks arrive before the message itself.
2. Better handling when checking state to determine whether to remove an
interest based message.
3. Improved StepDown() and leadership transfer handling after restarts.
4. Improved voting logic for high load systems.
5. Various improvements and fixes for filestore Compact(), which is used
heavily in the raft layer when updating snapshots and the raft wal.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 30, 2023
2 parents 9cc66c0 + c546828 commit 02702e4
Show file tree
Hide file tree
Showing 11 changed files with 801 additions and 300 deletions.
61 changes: 55 additions & 6 deletions server/consumer.go
Expand Up @@ -2388,6 +2388,11 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {

func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return
}

var sagap uint64
var needSignal bool

Expand Down Expand Up @@ -2495,20 +2500,25 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
var needAck bool
var asflr, osseq uint64
var pending map[uint64]*Pending

o.mu.RLock()
defer o.mu.RUnlock()

isFiltered := o.isFiltered()
if isFiltered && o.mset == nil {
return false
}

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() && o.mset != nil {
if isFiltered {
if subj == _EMPTY_ {
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
o.mu.RUnlock()
return false
}
subj = svp.subj
}
if !o.isFilteredMatch(subj) {
o.mu.RUnlock()
return false
}
}
Expand All @@ -2518,14 +2528,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
pending = o.pending
} else {
if o.store == nil {
o.mu.RUnlock()
return false
}
state, err := o.store.BorrowState()
if err != nil || state == nil {
// Fall back to what we track internally for now.
needAck := sseq > o.asflr && !o.isFiltered()
o.mu.RUnlock()
return needAck
}
// If loading state as here, the osseq is +1.
Expand All @@ -2545,7 +2553,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
}

o.mu.RUnlock()
return needAck
}

Expand Down Expand Up @@ -4555,3 +4562,45 @@ func (o *consumer) isMonitorRunning() bool {
defer o.mu.Unlock()
return o.inMonitor
}

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() {
o.mu.Lock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState {
o.mu.Unlock()
return
}

state, err := o.store.State()
o.mu.Unlock()

if err != nil {
return
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

asflr := state.AckFloor.Stream
for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}

o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 {
for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ {
if _, ok := state.Pending[seq]; !ok {
mset.ackMsg(o, seq)
}
}
}
}
78 changes: 52 additions & 26 deletions server/filestore.go
Expand Up @@ -106,6 +106,7 @@ type psi struct {
}

type fileStore struct {
srv *Server
mu sync.RWMutex
state StreamState
ld *LostStreamData
Expand Down Expand Up @@ -373,6 +374,12 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return fs, nil
}

func (fs *fileStore) registerServer(s *Server) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.srv = s
}

// Lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) lockAllMsgBlocks() {
Expand Down Expand Up @@ -765,6 +772,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
if ld, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}

if mb.msgs > 0 && !mb.noTrack && fs.psim != nil {
fs.populateGlobalPerSubjectInfo(mb)
// Try to dump any state we needed on recovery.
Expand Down Expand Up @@ -1036,7 +1044,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh {
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
Expand Down Expand Up @@ -2625,6 +2633,16 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
if secure && fs.prf != nil {
secure = false
}

if fs.state.Msgs == 0 {
var err = ErrStoreEOF
if seq <= fs.state.LastSeq {
err = ErrStoreMsgNotFound
}
fsUnlock()
return false, err
}

mb := fs.selectMsgBlock(seq)
if mb == nil {
var err = ErrStoreEOF
Expand All @@ -2637,8 +2655,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (

mb.mu.Lock()

// See if the sequence number is still relevant.
if seq < mb.first.seq {
// See if we are closed or the sequence number is still relevant.
if mb.closed || seq < mb.first.seq {
mb.mu.Unlock()
fsUnlock()
return false, nil
Expand Down Expand Up @@ -2933,6 +2951,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if mb.cache == nil || slot >= len(mb.cache.idx) {
return 0, 0, false, errPartialCache
}

bi := mb.cache.idx[slot]
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0

Expand Down Expand Up @@ -3911,11 +3930,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dlen := int(rl) - msgHdrSize

// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 {
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 {
// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
}

// Clear erase bit.
seq = seq &^ ebit
// Adjust if we guessed wrong.
Expand Down Expand Up @@ -4756,6 +4776,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
if err != nil {
return err
}
if fi, _ := ifd.Stat(); fi != nil {
mb.liwsz = fi.Size()
}
mb.ifd = ifd
}

Expand Down Expand Up @@ -5175,23 +5198,24 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
// but not including the seq parameter.
// Will return the number of purged messages.
func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if seq == 0 || seq > fs.lastSeq() {
if seq == 0 {
return fs.purge(seq)
}

var purged, bytes uint64

// We have to delete interior messages.
fs.mu.Lock()
if lseq := fs.state.LastSeq; seq > lseq {
fs.mu.Unlock()
return fs.purge(seq)
}

smb := fs.selectMsgBlock(seq)
if smb == nil {
fs.mu.Unlock()
return 0, nil
}
if err := smb.loadMsgs(); err != nil {
fs.mu.Unlock()
return 0, err
}

// All msgblocks up to this one can be thrown away.
var deleted int
Expand All @@ -5218,7 +5242,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
var isEmpty bool

smb.mu.Lock()
// Since we loaded before we acquired our lock, double check here under lock that we have the messages loaded.
if smb.first.seq == seq {
isEmpty = smb.msgs == 0
goto SKIP
}

// Make sure we have the messages loaded.
if smb.cacheNotLoaded() {
if err = smb.loadMsgsWithLock(); err != nil {
goto SKIP
Expand All @@ -5229,7 +5258,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if err == errDeletedMsg {
// Update dmap.
if len(smb.dmap) > 0 {
delete(smb.dmap, seq)
delete(smb.dmap, mseq)
if len(smb.dmap) == 0 {
smb.dmap = nil
}
Expand Down Expand Up @@ -5265,7 +5294,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {

// Check if we should reclaim the head space from this block.
// This will be optimistic only, so don't continue if we encounter any errors here.
if smb.bytes*2 < smb.rbytes {
if smb.rbytes > compactMinimum && smb.bytes*2 < smb.rbytes {
var moff uint32
moff, _, _, err = smb.slotInfo(int(smb.first.seq - smb.cache.fseq))
if err != nil || moff >= uint32(len(smb.cache.buf)) {
Expand Down Expand Up @@ -5299,13 +5328,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
}

SKIP:
smb.mu.Unlock()

if !isEmpty {
// Make sure to write out our index info.
smb.writeIndexInfo()
smb.writeIndexInfoLocked()
}

smb.mu.Unlock()

if deleted > 0 {
// Update block map.
if fs.bim != nil {
Expand All @@ -5329,7 +5358,7 @@ SKIP:
cb := fs.scb
fs.mu.Unlock()

if cb != nil {
if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}

Expand All @@ -5338,7 +5367,6 @@ SKIP:

// Will completely reset our store.
func (fs *fileStore) reset() error {

fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
Expand All @@ -5352,14 +5380,12 @@ func (fs *fileStore) reset() error {
var purged, bytes uint64
cb := fs.scb

if cb != nil {
for _, mb := range fs.blks {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}
for _, mb := range fs.blks {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}

// Reset
Expand Down Expand Up @@ -6749,7 +6775,7 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return nil
}

Expand Down

0 comments on commit 02702e4

Please sign in to comment.