Skip to content

Commit

Permalink
Added ability to control sync intervals and sync always (#4483)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 4, 2023
2 parents 60e41aa + e7e8a33 commit bf7b4b8
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 67 deletions.
144 changes: 94 additions & 50 deletions server/filestore.go
Expand Up @@ -52,6 +52,8 @@ type FileStoreConfig struct {
CacheExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
// SyncAlways is when the stream should sync all data writes.
SyncAlways bool
// AsyncFlush allows async flush to batch write operations.
AsyncFlush bool
// Cipher is the cipher to use when encrypting.
Expand Down Expand Up @@ -186,42 +188,44 @@ type fileStore struct {
// Represents a message store block and its data.
type msgBlock struct {
// Here for 32bit systems and atomic.
first msgId
last msgId
mu sync.RWMutex
fs *fileStore
aek cipher.AEAD
bek cipher.Stream
seed []byte
nonce []byte
mfn string
mfd *os.File
cmp StoreCompression // Effective compression at the time of loading the block
liwsz int64
index uint32
bytes uint64 // User visible bytes count.
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
kfn string
lwts int64
llts int64
lrts int64
llseq uint64
hh hash.Hash64
cache *cache
cloads uint64
cexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
fch chan struct{}
qch chan struct{}
lchk [8]byte
loading bool
flusher bool
noTrack bool
closed bool
first msgId
last msgId
mu sync.RWMutex
fs *fileStore
aek cipher.AEAD
bek cipher.Stream
seed []byte
nonce []byte
mfn string
mfd *os.File
cmp StoreCompression // Effective compression at the time of loading the block
liwsz int64
index uint32
bytes uint64 // User visible bytes count.
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
kfn string
lwts int64
llts int64
lrts int64
llseq uint64
hh hash.Hash64
cache *cache
cloads uint64
cexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
fch chan struct{}
qch chan struct{}
lchk [8]byte
loading bool
flusher bool
noTrack bool
needSync bool
syncAlways bool
closed bool

// Used to mock write failures.
mockWriteErr bool
Expand Down Expand Up @@ -285,7 +289,7 @@ const (
// default cache buffer expiration
defaultCacheBufferExpiration = 5 * time.Second
// default sync interval
defaultSyncInterval = 60 * time.Second
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// coalesceMinimum
Expand Down Expand Up @@ -853,7 +857,7 @@ func (fs *fileStore) noTrackSubjects() bool {

// Will init the basics for a message block.
func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}

mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, index))
Expand Down Expand Up @@ -2767,7 +2771,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
}

mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}

// Lock should be held to quiet race detector.
mb.mu.Lock()
Expand Down Expand Up @@ -4398,10 +4402,16 @@ func (mb *msgBlock) pendingWriteSize() int {
if mb == nil {
return 0
}

mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.pendingWriteSizeLocked()
}

// How many bytes pending to be written for this message block.
func (mb *msgBlock) pendingWriteSizeLocked() int {
if mb == nil {
return 0
}
var pending int
if !mb.closed && mb.mfd != nil && mb.cache != nil {
pending = len(mb.cache.buf) - int(mb.cache.wp)
Expand Down Expand Up @@ -4675,27 +4685,49 @@ func (fs *fileStore) syncBlocks() {
fs.mu.RUnlock()

for _, mb := range blks {
// Flush anything that may be pending.
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
}
// Do actual sync. Hold lock for consistency.
mb.mu.Lock()
if !mb.closed {
if mb.closed {
mb.mu.Unlock()
continue
}
if mb.needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
if mb.mfd != nil {
mb.mfd.Sync()
} else {
fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
mb.mu.Unlock()
continue
}
fd.Sync()
fd.Close()
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.needSync = false
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.mu.Unlock()
}

fs.mu.Lock()
fs.syncTmr = time.AfterFunc(fs.fcfg.SyncInterval, fs.syncBlocks)
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
syncAlways := fs.fcfg.SyncAlways
fs.mu.Unlock()

if !syncAlways {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
fd.Close()
}
}
}

// Select the message block where this message should be found.
Expand Down Expand Up @@ -4957,6 +4989,13 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
return fsLostData, mb.werr
}

// Check if we are in sync always mode.
if mb.syncAlways {
mb.mfd.Sync()
} else {
mb.needSync = true
}

// Check for additional writes while we were writing to the disk.
moreBytes := len(mb.cache.buf) - mb.cache.wp - lob

Expand Down Expand Up @@ -6163,9 +6202,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
fs.state.FirstTime = time.Time{}
deleted++
} else {
// Make sure to sync changes.
smb.needSync = true
// Update fs first seq and time.
smb.first.seq = seq - 1 // Just for start condition for selectNextFirst.
smb.selectNextFirst()

fs.state.FirstSeq = smb.first.seq
fs.state.FirstTime = time.Unix(0, smb.first.ts).UTC()

Expand Down Expand Up @@ -6909,7 +6951,9 @@ func (fs *fileStore) writeFullState() error {
}
tmpName := f.Name()
defer os.Remove(tmpName)
_, err = f.Write(buf)
if _, err = f.Write(buf); err == nil && fs.fcfg.SyncAlways {
f.Sync()
}
f.Close()
if err != nil {
return err
Expand Down
37 changes: 37 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5392,6 +5392,43 @@ func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
require_NoError(t, err)
}

func TestFileStoreSyncIntervals(t *testing.T) {
fcfg := FileStoreConfig{StoreDir: t.TempDir(), SyncInterval: 250 * time.Millisecond}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkSyncFlag := func(expected bool) {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.RLock()
syncNeeded := lmb.needSync
lmb.mu.RUnlock()
if syncNeeded != expected {
t.Fatalf("Expected needSync to be %v", expected)
}
}

checkSyncFlag(false)
fs.StoreMsg("Z", nil, []byte("hello"))
checkSyncFlag(true)
time.Sleep(400 * time.Millisecond)
checkSyncFlag(false)
fs.Stop()

// Now check always
fcfg.SyncInterval = 10 * time.Second
fcfg.SyncAlways = true
fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkSyncFlag(false)
fs.StoreMsg("Z", nil, []byte("hello"))
checkSyncFlag(false)
}

///////////////////////////////////////////////////////////////////////////
// New WAL based architecture tests
///////////////////////////////////////////////////////////////////////////
Expand Down
29 changes: 19 additions & 10 deletions server/jetstream.go
Expand Up @@ -38,12 +38,14 @@ import (
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
SyncInterval time.Duration `json:"sync_interval,omitempty"`
SyncAlways bool `json:"sync_always,omitempty"`
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
}

// Statistics about JetStream for this server.
Expand Down Expand Up @@ -490,10 +492,12 @@ func (s *Server) updateJetStreamInfoStatus(enabled bool) {
func (s *Server) restartJetStream() error {
opts := s.getOpts()
cfg := JetStreamConfig{
StoreDir: opts.StoreDir,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
Domain: opts.JetStreamDomain,
StoreDir: opts.StoreDir,
SyncInterval: opts.SyncInterval,
SyncAlways: opts.SyncAlways,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
Domain: opts.JetStreamDomain,
}
s.Noticef("Restarting JetStream")
err := s.EnableJetStream(&cfg)
Expand Down Expand Up @@ -2399,6 +2403,10 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je

opts := s.getOpts()

// Sync options.
jsc.SyncInterval = opts.SyncInterval
jsc.SyncAlways = opts.SyncAlways

if opts.maxStoreSet && maxStore >= 0 {
jsc.MaxStore = maxStore
} else {
Expand All @@ -2415,6 +2423,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je
jsc.MaxMemory = JetStreamMaxMemDefault
}
}

return jsc
}

Expand Down
50 changes: 50 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21533,6 +21533,7 @@ func TestJetStreamChangeMaxMessagesPerSubject(t *testing.T) {
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Expand Down Expand Up @@ -21699,3 +21700,52 @@ func TestJetStreamConsumerDefaultsFromStream(t *testing.T) {
}
})
}

func TestJetStreamSyncInterval(t *testing.T) {
sd := t.TempDir()
tmpl := `
listen: 127.0.0.1:-1
jetstream: {
store_dir: %q
%s
}`

for _, test := range []struct {
name string
sync string
expected time.Duration
always bool
}{
{"Default", _EMPTY_, defaultSyncInterval, false},
{"10s", "sync_interval: 10s", time.Duration(10 * time.Second), false},
{"Always", "sync_interval: always", defaultSyncInterval, true},
} {
t.Run(test.name, func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, sd, test.sync)))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

opts := s.getOpts()
require_True(t, opts.SyncInterval == test.expected)

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test.>"},
})
require_NoError(t, err)

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
fs.mu.RLock()
fsSync := fs.fcfg.SyncInterval
syncAlways := fs.fcfg.SyncAlways
fs.mu.RUnlock()
require_True(t, fsSync == test.expected)
require_True(t, syncAlways == test.always)
})
}
}

0 comments on commit bf7b4b8

Please sign in to comment.