Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVEMENT] General stability and bug fixes. #3999

Merged
merged 20 commits into from Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d43041
Bug fixes and general stability improvements.
derekcollison Mar 29, 2023
182bf6c
Bug fixes and general stability improvements.
derekcollison Mar 29, 2023
e516c47
Improvements to consumers attached to an interest retention stream.
derekcollison Mar 29, 2023
5cabc36
General improvements around handling interest retention.
derekcollison Mar 29, 2023
71af150
General improvements to interest based stream processing when acks ar…
derekcollison Mar 29, 2023
0d9f707
Additional tests to stress interest based streams with pull subscribe…
derekcollison Mar 29, 2023
52fbac6
Since we no longer store leaderTransfers, which is proper, some tests…
derekcollison Mar 29, 2023
e97ddcd
Tweak tests due to changes, make test timeouts uniform.
derekcollison Mar 29, 2023
c4da37e
Make sure consumer is valid and state was returned
derekcollison Mar 29, 2023
35d1a77
Snapshots of no length can hold state as well
derekcollison Mar 29, 2023
a9a4df8
Fix for flapping test
derekcollison Mar 29, 2023
ddfa5cd
Additional protection for bad state when rebuilding a message block
derekcollison Mar 29, 2023
6c3e64b
Always make sure cluster and meta raft node available when needed
derekcollison Mar 29, 2023
e274693
On bad or corrupt message load during commit, reset WAL vs mark write…
derekcollison Mar 29, 2023
2b89fea
Double check here if the jetstream cluster was shutdown when we relea…
derekcollison Mar 29, 2023
c77872b
Update server/jetstream_cluster.go
derekcollison Mar 29, 2023
152b25c
Update server/stream.go
derekcollison Mar 29, 2023
9a714e7
Update based on review feedback
derekcollison Mar 29, 2023
ade0e9d
Snapshot meta for this function to use in case it gets removed out fr…
derekcollison Mar 29, 2023
c546828
Moved log running test to NoRace suite
derekcollison Mar 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 55 additions & 6 deletions server/consumer.go
Expand Up @@ -2388,6 +2388,11 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {

func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return
}

var sagap uint64
var needSignal bool

Expand Down Expand Up @@ -2495,37 +2500,40 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
var needAck bool
var asflr, osseq uint64
var pending map[uint64]*Pending

o.mu.RLock()
defer o.mu.RUnlock()

isFiltered := o.isFiltered()
// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() && o.mset != nil {
if isFiltered && o.mset != nil {
if subj == _EMPTY_ {
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
o.mu.RUnlock()
return false
}
subj = svp.subj
}
if !o.isFilteredMatch(subj) {
o.mu.RUnlock()
return false
}
}

if isFiltered && o.mset == nil {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
return false
}

if o.isLeader() {
asflr, osseq = o.asflr, o.sseq
pending = o.pending
} else {
if o.store == nil {
o.mu.RUnlock()
return false
}
state, err := o.store.BorrowState()
if err != nil || state == nil {
// Fall back to what we track internally for now.
needAck := sseq > o.asflr && !o.isFiltered()
o.mu.RUnlock()
return needAck
}
// If loading state as here, the osseq is +1.
Expand All @@ -2545,7 +2553,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
}

o.mu.RUnlock()
return needAck
}

Expand Down Expand Up @@ -4555,3 +4562,45 @@ func (o *consumer) isMonitorRunning() bool {
defer o.mu.Unlock()
return o.inMonitor
}

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() {
o.mu.Lock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState {
o.mu.Unlock()
return
}

state, err := o.store.State()
o.mu.Unlock()

if err != nil {
return
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

asflr := state.AckFloor.Stream
for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}

o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 {
for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ {
if _, ok := state.Pending[seq]; !ok {
mset.ackMsg(o, seq)
}
}
}
}
78 changes: 52 additions & 26 deletions server/filestore.go
Expand Up @@ -106,6 +106,7 @@ type psi struct {
}

type fileStore struct {
srv *Server
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
mu sync.RWMutex
state StreamState
ld *LostStreamData
Expand Down Expand Up @@ -373,6 +374,12 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return fs, nil
}

func (fs *fileStore) registerServer(s *Server) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
fs.mu.Lock()
defer fs.mu.Unlock()
fs.srv = s
}

// Lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) lockAllMsgBlocks() {
Expand Down Expand Up @@ -765,6 +772,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
if ld, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}

if mb.msgs > 0 && !mb.noTrack && fs.psim != nil {
fs.populateGlobalPerSubjectInfo(mb)
// Try to dump any state we needed on recovery.
Expand Down Expand Up @@ -1036,7 +1044,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh {
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
Expand Down Expand Up @@ -2625,6 +2633,16 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
if secure && fs.prf != nil {
secure = false
}

if fs.state.Msgs == 0 {
var err = ErrStoreEOF
if seq <= fs.state.LastSeq {
err = ErrStoreMsgNotFound
}
fsUnlock()
return false, err
}

mb := fs.selectMsgBlock(seq)
if mb == nil {
var err = ErrStoreEOF
Expand All @@ -2637,8 +2655,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (

mb.mu.Lock()

// See if the sequence number is still relevant.
if seq < mb.first.seq {
// See if we are closed or the sequence number is still relevant.
if mb.closed || seq < mb.first.seq {
mb.mu.Unlock()
fsUnlock()
return false, nil
Expand Down Expand Up @@ -2933,6 +2951,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if mb.cache == nil || slot >= len(mb.cache.idx) {
return 0, 0, false, errPartialCache
}

bi := mb.cache.idx[slot]
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0

Expand Down Expand Up @@ -3911,11 +3930,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dlen := int(rl) - msgHdrSize

// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 {
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 {
// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
}

// Clear erase bit.
seq = seq &^ ebit
// Adjust if we guessed wrong.
Expand Down Expand Up @@ -4756,6 +4776,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
if err != nil {
return err
}
if fi, _ := ifd.Stat(); fi != nil {
mb.liwsz = fi.Size()
}
mb.ifd = ifd
}

Expand Down Expand Up @@ -5175,23 +5198,24 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
// but not including the seq parameter.
// Will return the number of purged messages.
func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if seq == 0 || seq > fs.lastSeq() {
if seq == 0 {
return fs.purge(seq)
}

var purged, bytes uint64

// We have to delete interior messages.
fs.mu.Lock()
if lseq := fs.state.LastSeq; seq > lseq {
fs.mu.Unlock()
return fs.purge(seq)
}

smb := fs.selectMsgBlock(seq)
if smb == nil {
fs.mu.Unlock()
return 0, nil
}
if err := smb.loadMsgs(); err != nil {
fs.mu.Unlock()
return 0, err
}

// All msgblocks up to this one can be thrown away.
var deleted int
Expand All @@ -5218,7 +5242,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
var isEmpty bool

smb.mu.Lock()
// Since we loaded before we acquired our lock, double check here under lock that we have the messages loaded.
if smb.first.seq == seq {
isEmpty = smb.msgs == 0
goto SKIP
}

// Make sure we have the messages loaded.
if smb.cacheNotLoaded() {
if err = smb.loadMsgsWithLock(); err != nil {
goto SKIP
Expand All @@ -5229,7 +5258,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if err == errDeletedMsg {
// Update dmap.
if len(smb.dmap) > 0 {
delete(smb.dmap, seq)
delete(smb.dmap, mseq)
if len(smb.dmap) == 0 {
smb.dmap = nil
}
Expand Down Expand Up @@ -5265,7 +5294,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {

// Check if we should reclaim the head space from this block.
// This will be optimistic only, so don't continue if we encounter any errors here.
if smb.bytes*2 < smb.rbytes {
if smb.rbytes > compactMinimum && smb.bytes*2 < smb.rbytes {
var moff uint32
moff, _, _, err = smb.slotInfo(int(smb.first.seq - smb.cache.fseq))
if err != nil || moff >= uint32(len(smb.cache.buf)) {
Expand Down Expand Up @@ -5299,13 +5328,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
}

SKIP:
smb.mu.Unlock()

if !isEmpty {
// Make sure to write out our index info.
smb.writeIndexInfo()
smb.writeIndexInfoLocked()
}

smb.mu.Unlock()

if deleted > 0 {
// Update block map.
if fs.bim != nil {
Expand All @@ -5329,7 +5358,7 @@ SKIP:
cb := fs.scb
fs.mu.Unlock()

if cb != nil {
if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}

Expand All @@ -5338,7 +5367,6 @@ SKIP:

// Will completely reset our store.
func (fs *fileStore) reset() error {

fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
Expand All @@ -5352,14 +5380,12 @@ func (fs *fileStore) reset() error {
var purged, bytes uint64
cb := fs.scb

if cb != nil {
for _, mb := range fs.blks {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}
for _, mb := range fs.blks {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}

// Reset
Expand Down Expand Up @@ -6749,7 +6775,7 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/filestore_test.go
Expand Up @@ -3668,7 +3668,7 @@ func TestFileStoreFetchPerf(t *testing.T) {
// https://github.com/nats-io/nats-server/issues/2936
func TestFileStoreCompactReclaimHeadSpace(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1024 * 1024
fcfg.BlockSize = 4 * 1024 * 1024

fs, err := newFileStore(
fcfg,
Expand All @@ -3678,7 +3678,7 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) {
defer fs.Stop()

// Create random bytes for payload to test for corruption vs repeated.
msg := make([]byte, 16*1024)
msg := make([]byte, 64*1024)
crand.Read(msg)

// This gives us ~63 msgs in first and ~37 in second.
Expand Down