Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to Filestore #3867

Merged
merged 3 commits into from Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 26 additions & 9 deletions server/filestore.go
Expand Up @@ -30,7 +30,6 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -169,6 +168,10 @@ type msgBlock struct {
noTrack bool
closed bool

// To avoid excessive writes when expiring cache.
// These can be big.
fssNeedsWrite bool

// Used to mock write failures.
mockWriteErr bool
}
Expand Down Expand Up @@ -1112,6 +1115,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
subj := mb.subjString(data[:slen])
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}
}
// Advance to next record.
Expand Down Expand Up @@ -3123,11 +3127,14 @@ func (fs *fileStore) expireMsgs() {
var smv StoreMsg
var sm *StoreMsg
fs.mu.RLock()
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
maxAge := int64(fs.cfg.MaxAge)
minAge := time.Now().UnixNano() - maxAge
fs.mu.RUnlock()

for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}

fs.mu.Lock()
Expand Down Expand Up @@ -3236,6 +3243,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
} else {
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}

// Indexing
Expand Down Expand Up @@ -5286,6 +5294,10 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
if ss == nil {
return
}

// Mark dirty
mb.fssNeedsWrite = true

if ss.Msgs == 1 {
delete(mb.fss, subj)
return
Expand Down Expand Up @@ -5376,6 +5388,7 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error {
} else {
mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}
}

Expand Down Expand Up @@ -5421,6 +5434,9 @@ func (mb *msgBlock) loadPerSubjectInfo() ([]byte, error) {
// Helper to make sure fss loaded if we are tracking.
// Lock should be held
func (mb *msgBlock) ensurePerSubjectInfoLoaded() error {
// Clear
mb.fssNeedsWrite = false

if mb.fss != nil || mb.noTrack {
return nil
}
Expand Down Expand Up @@ -5529,7 +5545,7 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
// Lock should be held.
func (mb *msgBlock) writePerSubjectInfo() error {
// Raft groups do not have any subjects.
if len(mb.fss) == 0 || len(mb.sfn) == 0 {
if len(mb.fss) == 0 || len(mb.sfn) == 0 || !mb.fssNeedsWrite {
return nil
}
var scratch [4 * binary.MaxVarintLen64]byte
Expand Down Expand Up @@ -5560,6 +5576,11 @@ func (mb *msgBlock) writePerSubjectInfo() error {
err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms)
dios <- struct{}{}

// Clear write flag if no error.
if err == nil {
mb.fssNeedsWrite = false
}

return err
}

Expand Down Expand Up @@ -6446,12 +6467,8 @@ var dios chan struct{}
// Used to setup our simplistic counting semaphore using buffered channels.
// golang.org's semaphore seemed a bit heavy.
func init() {
// Minimum for blocking disk IO calls.
const minNIO = 4
nIO := runtime.GOMAXPROCS(0)
if nIO < minNIO {
nIO = minNIO
}
// Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls.
const nIO = 1024
dios = make(chan struct{}, nIO)
// Fill it up to start.
for i := 0; i < nIO; i++ {
Expand Down
47 changes: 47 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5227,3 +5227,50 @@ func TestFileStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
require_True(t, state.NumSubjects == 500)
})
}

func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.CacheExpire = 100 * time.Millisecond

fs, err := newFileStore(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}

// Grab first msg block.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()

needsUpdate := func() bool {
mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.fssNeedsWrite
}
require_True(t, needsUpdate())
time.Sleep(2 * fcfg.CacheExpire)
require_False(t, needsUpdate())

// Make sure reads do not trigger an update.
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
require_False(t, needsUpdate())

// Remove will though.
_, err = fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, needsUpdate())

// We should update then clear.
time.Sleep(2 * fcfg.CacheExpire)
require_False(t, needsUpdate())
})
}
13 changes: 11 additions & 2 deletions server/memstore.go
Expand Up @@ -421,14 +421,23 @@ func (ms *memStore) expireMsgs() {
for {
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
ms.deleteFirstMsgOrPanic()
// Recalculate in case we are expiring a bunch.
now = time.Now().UnixNano()
minAge = now - int64(ms.cfg.MaxAge)

} else {
if !ok {
if len(ms.msgs) == 0 {
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
} else {
fireIn := time.Duration(sm.ts-now) + ms.cfg.MaxAge
var fireIn time.Duration
if sm == nil {
fireIn = ms.cfg.MaxAge
} else {
fireIn = time.Duration(sm.ts - minAge)
}
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
} else {
Expand Down
61 changes: 61 additions & 0 deletions server/norace_test.go
Expand Up @@ -6139,3 +6139,64 @@ func TestNoRaceJetStreamClusterEnsureWALCompact(t *testing.T) {
t.Fatalf("Did not snapshot and compact the raft WAL, entries == %d", ne)
}
}

func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) {
// Uncomment to run.
skip(t)

storeDir := t.TempDir()
maxAge := 5 * time.Second

fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "MA",
Subjects: []string{"foo.*"},
MaxAge: maxAge,
Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

// Simulate a callback similar to consumers decrementing.
var mu sync.RWMutex
var pending int64

fs.RegisterStorageUpdates(func(md, bd int64, seq uint64, subj string) {
mu.Lock()
defer mu.Unlock()
pending += md
})

start, num, subj := time.Now(), 0, "foo.foo"

timeout := start.Add(maxAge)
for time.Now().Before(timeout) {
// We will store in blocks of 100.
for i := 0; i < 100; i++ {
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
num++
}
}
elapsed := time.Since(start)
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())

// Now keep running for 2x longer knowing we are expiring messages in the background.
// We want to see the effect on performance.

start = time.Now()
timeout = start.Add(maxAge * 2)

for time.Now().Before(timeout) {
// We will store in blocks of 100.
for i := 0; i < 100; i++ {
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
num++
}
}
elapsed = time.Since(start)
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())
}