Skip to content

Commit

Permalink
[FIXED] Unlock panic on start when filestore needs to remove msgs for…
Browse files Browse the repository at this point in the history
… enforcement. (#4469)

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 1, 2023
2 parents 0ec42f8 + 60fa2d8 commit 83fab5c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
38 changes: 19 additions & 19 deletions server/filestore.go
Expand Up @@ -447,15 +447,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}()
}

// Lock while do enforcements and removals.
fs.mu.Lock()

// Check if we have any left over tombstones to process.
if len(fs.tombs) > 0 {
fs.mu.Lock()
for _, seq := range fs.tombs {
fs.removeMsg(seq, false, false, false)
}
// Not needed after this phase.
fs.tombs = nil
fs.mu.Unlock()
}

// Limits checks and enforcement.
Expand All @@ -473,16 +474,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.enforceMsgPerSubjectLimit()
}

// Grab first sequence for check below while we have lock.
firstSeq := fs.state.FirstSeq
fs.mu.Unlock()

// If the stream has an initial sequence number then make sure we
// have purged up until that point. We will do this only if the
// recovered first sequence number is before our configured first
// sequence. Need to do this locked as by now the age check timer
// has started.
var st StreamState
fs.mu.RLock()
fs.FastState(&st)
fs.mu.RUnlock()
if cfg.FirstSeq > 0 && st.FirstSeq <= cfg.FirstSeq {
if cfg.FirstSeq > 0 && firstSeq <= cfg.FirstSeq {
if _, err := fs.purge(cfg.FirstSeq); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1866,9 +1867,6 @@ func (fs *fileStore) recoverMsgs() error {
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
fs.mu.Lock()
defer fs.mu.Unlock()

if fs.state.Msgs == 0 {
return
}
Expand Down Expand Up @@ -3507,21 +3505,23 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
fs.kickFlushStateLoop()
}

cb := fs.scb
fs.mu.Unlock()

// Storage updates.
if cb != nil {
subj := _EMPTY_
if cb := fs.scb; cb != nil {
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
fs.mu.Unlock()
// Storage updates.
var subj string
if sm != nil {
subj = sm.subj
}
delta := int64(msz)
cb(-1, -delta, seq, subj)
}

if !needFSLock {
fs.mu.Lock()
if !needFSLock {
fs.mu.Lock()
}
} else if needFSLock {
// We acquired it so release it.
fs.mu.Unlock()
}

return true, nil
Expand Down
29 changes: 29 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5689,6 +5689,35 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}

func TestFileStoreRestartWithExpireAndLockingBug(t *testing.T) {
sd := t.TempDir()
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()

// 20 total
msg := []byte("HELLO WORLD")
for i := 0; i < 10; i++ {
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
}
fs.Stop()

// Now change config underneath of so we will do expires at startup.
scfg.MaxMsgs = 15
scfg.MaxMsgsPer = 2
newCfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: scfg}

// Replace
fs.cfg = newCfg
require_NoError(t, fs.writeStreamMeta())

fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 83fab5c

Please sign in to comment.