Skip to content

Commit

Permalink
Merge pull request #3867 from nats-io/improvements
Browse files Browse the repository at this point in the history
Improvements to Filestore
  • Loading branch information
derekcollison committed Feb 14, 2023
2 parents f3fa39c + 3bc0af7 commit 345496f
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 11 deletions.
35 changes: 26 additions & 9 deletions server/filestore.go
Expand Up @@ -30,7 +30,6 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -169,6 +168,10 @@ type msgBlock struct {
noTrack bool
closed bool

// To avoid excessive writes when expiring cache.
// These can be big.
fssNeedsWrite bool

// Used to mock write failures.
mockWriteErr bool
}
Expand Down Expand Up @@ -1112,6 +1115,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
subj := mb.subjString(data[:slen])
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}
}
// Advance to next record.
Expand Down Expand Up @@ -3123,11 +3127,14 @@ func (fs *fileStore) expireMsgs() {
var smv StoreMsg
var sm *StoreMsg
fs.mu.RLock()
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
maxAge := int64(fs.cfg.MaxAge)
minAge := time.Now().UnixNano() - maxAge
fs.mu.RUnlock()

for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}

fs.mu.Lock()
Expand Down Expand Up @@ -3236,6 +3243,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
} else {
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}

// Indexing
Expand Down Expand Up @@ -5286,6 +5294,10 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
if ss == nil {
return
}

// Mark dirty
mb.fssNeedsWrite = true

if ss.Msgs == 1 {
delete(mb.fss, subj)
return
Expand Down Expand Up @@ -5376,6 +5388,7 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error {
} else {
mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
mb.fssNeedsWrite = true
}
}

Expand Down Expand Up @@ -5421,6 +5434,9 @@ func (mb *msgBlock) loadPerSubjectInfo() ([]byte, error) {
// Helper to make sure fss loaded if we are tracking.
// Lock should be held
func (mb *msgBlock) ensurePerSubjectInfoLoaded() error {
// Clear
mb.fssNeedsWrite = false

if mb.fss != nil || mb.noTrack {
return nil
}
Expand Down Expand Up @@ -5529,7 +5545,7 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
// Lock should be held.
func (mb *msgBlock) writePerSubjectInfo() error {
// Raft groups do not have any subjects.
if len(mb.fss) == 0 || len(mb.sfn) == 0 {
if len(mb.fss) == 0 || len(mb.sfn) == 0 || !mb.fssNeedsWrite {
return nil
}
var scratch [4 * binary.MaxVarintLen64]byte
Expand Down Expand Up @@ -5560,6 +5576,11 @@ func (mb *msgBlock) writePerSubjectInfo() error {
err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms)
dios <- struct{}{}

// Clear write flag if no error.
if err == nil {
mb.fssNeedsWrite = false
}

return err
}

Expand Down Expand Up @@ -6446,12 +6467,8 @@ var dios chan struct{}
// Used to setup our simplistic counting semaphore using buffered channels.
// golang.org's semaphore seemed a bit heavy.
func init() {
// Minimum for blocking disk IO calls.
const minNIO = 4
nIO := runtime.GOMAXPROCS(0)
if nIO < minNIO {
nIO = minNIO
}
// Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls.
const nIO = 1024
dios = make(chan struct{}, nIO)
// Fill it up to start.
for i := 0; i < nIO; i++ {
Expand Down
47 changes: 47 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5227,3 +5227,50 @@ func TestFileStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
require_True(t, state.NumSubjects == 500)
})
}

func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.CacheExpire = 100 * time.Millisecond

fs, err := newFileStore(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}

// Grab first msg block.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()

needsUpdate := func() bool {
mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.fssNeedsWrite
}
require_True(t, needsUpdate())
time.Sleep(2 * fcfg.CacheExpire)
require_False(t, needsUpdate())

// Make sure reads do not trigger an update.
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
require_False(t, needsUpdate())

// Remove will though.
_, err = fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, needsUpdate())

// We should update then clear.
time.Sleep(2 * fcfg.CacheExpire)
require_False(t, needsUpdate())
})
}
13 changes: 11 additions & 2 deletions server/memstore.go
Expand Up @@ -421,14 +421,23 @@ func (ms *memStore) expireMsgs() {
for {
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
ms.deleteFirstMsgOrPanic()
// Recalculate in case we are expiring a bunch.
now = time.Now().UnixNano()
minAge = now - int64(ms.cfg.MaxAge)

} else {
if !ok {
if len(ms.msgs) == 0 {
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
} else {
fireIn := time.Duration(sm.ts-now) + ms.cfg.MaxAge
var fireIn time.Duration
if sm == nil {
fireIn = ms.cfg.MaxAge
} else {
fireIn = time.Duration(sm.ts - minAge)
}
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
} else {
Expand Down
61 changes: 61 additions & 0 deletions server/norace_test.go
Expand Up @@ -6139,3 +6139,64 @@ func TestNoRaceJetStreamClusterEnsureWALCompact(t *testing.T) {
t.Fatalf("Did not snapshot and compact the raft WAL, entries == %d", ne)
}
}

func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) {
// Uncomment to run.
skip(t)

storeDir := t.TempDir()
maxAge := 5 * time.Second

fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "MA",
Subjects: []string{"foo.*"},
MaxAge: maxAge,
Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

// Simulate a callback similar to consumers decrementing.
var mu sync.RWMutex
var pending int64

fs.RegisterStorageUpdates(func(md, bd int64, seq uint64, subj string) {
mu.Lock()
defer mu.Unlock()
pending += md
})

start, num, subj := time.Now(), 0, "foo.foo"

timeout := start.Add(maxAge)
for time.Now().Before(timeout) {
// We will store in blocks of 100.
for i := 0; i < 100; i++ {
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
num++
}
}
elapsed := time.Since(start)
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())

// Now keep running for 2x longer knowing we are expiring messages in the background.
// We want to see the effect on performance.

start = time.Now()
timeout = start.Add(maxAge * 2)

for time.Now().Before(timeout) {
// We will store in blocks of 100.
for i := 0; i < 100; i++ {
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
num++
}
}
elapsed = time.Since(start)
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())
}

0 comments on commit 345496f

Please sign in to comment.