Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Unlock panic on start when filestore needs to remove msgs for enforcement. #4469

Merged
merged 2 commits into from Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 19 additions & 19 deletions server/filestore.go
Expand Up @@ -447,15 +447,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}()
}

// Lock while do enforcements and removals.
fs.mu.Lock()

// Check if we have any left over tombstones to process.
if len(fs.tombs) > 0 {
fs.mu.Lock()
for _, seq := range fs.tombs {
fs.removeMsg(seq, false, false, false)
}
// Not needed after this phase.
fs.tombs = nil
fs.mu.Unlock()
}

// Limits checks and enforcement.
Expand All @@ -473,16 +474,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.enforceMsgPerSubjectLimit()
}

// Grab first sequence for check below while we have lock.
firstSeq := fs.state.FirstSeq
fs.mu.Unlock()

// If the stream has an initial sequence number then make sure we
// have purged up until that point. We will do this only if the
// recovered first sequence number is before our configured first
// sequence. Need to do this locked as by now the age check timer
// has started.
var st StreamState
fs.mu.RLock()
fs.FastState(&st)
fs.mu.RUnlock()
if cfg.FirstSeq > 0 && st.FirstSeq <= cfg.FirstSeq {
if cfg.FirstSeq > 0 && firstSeq <= cfg.FirstSeq {
if _, err := fs.purge(cfg.FirstSeq); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1867,9 +1868,6 @@ func (fs *fileStore) recoverMsgs() error {
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
fs.mu.Lock()
defer fs.mu.Unlock()

if fs.state.Msgs == 0 {
return
}
Expand Down Expand Up @@ -3508,21 +3506,23 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
fs.kickFlushStateLoop()
}

cb := fs.scb
fs.mu.Unlock()

// Storage updates.
if cb != nil {
subj := _EMPTY_
if cb := fs.scb; cb != nil {
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
fs.mu.Unlock()
// Storage updates.
var subj string
if sm != nil {
subj = sm.subj
}
delta := int64(msz)
cb(-1, -delta, seq, subj)
}

if !needFSLock {
fs.mu.Lock()
if !needFSLock {
fs.mu.Lock()
}
} else if needFSLock {
// We acquired it so release it.
fs.mu.Unlock()
}

return true, nil
Expand Down
29 changes: 29 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5689,6 +5689,35 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}

func TestFileStoreRestartWithExpireAndLockingBug(t *testing.T) {
sd := t.TempDir()
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()

// 20 total
msg := []byte("HELLO WORLD")
for i := 0; i < 10; i++ {
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
}
fs.Stop()

// Now change config underneath of so we will do expires at startup.
scfg.MaxMsgs = 15
scfg.MaxMsgsPer = 2
newCfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: scfg}

// Replace
fs.cfg = newCfg
require_NoError(t, fs.writeStreamMeta())

fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down