Skip to content

Commit

Permalink
Allow sync intervals to be set and the ability to have all data write…
Browse files Browse the repository at this point in the history
…s synchronous.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 4, 2023
1 parent e11ddb8 commit bcb45b5
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 67 deletions.
144 changes: 94 additions & 50 deletions server/filestore.go
Expand Up @@ -52,6 +52,8 @@ type FileStoreConfig struct {
CacheExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
// SyncAlways is when the stream should sync all data writes.
SyncAlways bool
// AsyncFlush allows async flush to batch write operations.
AsyncFlush bool
// Cipher is the cipher to use when encrypting.
Expand Down Expand Up @@ -186,42 +188,44 @@ type fileStore struct {
// Represents a message store block and its data.
type msgBlock struct {
// Here for 32bit systems and atomic.
first msgId
last msgId
mu sync.RWMutex
fs *fileStore
aek cipher.AEAD
bek cipher.Stream
seed []byte
nonce []byte
mfn string
mfd *os.File
cmp StoreCompression // Effective compression at the time of loading the block
liwsz int64
index uint32
bytes uint64 // User visible bytes count.
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
kfn string
lwts int64
llts int64
lrts int64
llseq uint64
hh hash.Hash64
cache *cache
cloads uint64
cexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
fch chan struct{}
qch chan struct{}
lchk [8]byte
loading bool
flusher bool
noTrack bool
closed bool
first msgId
last msgId
mu sync.RWMutex
fs *fileStore
aek cipher.AEAD
bek cipher.Stream
seed []byte
nonce []byte
mfn string
mfd *os.File
cmp StoreCompression // Effective compression at the time of loading the block
liwsz int64
index uint32
bytes uint64 // User visible bytes count.
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
kfn string
lwts int64
llts int64
lrts int64
llseq uint64
hh hash.Hash64
cache *cache
cloads uint64
cexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
fch chan struct{}
qch chan struct{}
lchk [8]byte
loading bool
flusher bool
noTrack bool
needSync bool
syncAlways bool
closed bool

// Used to mock write failures.
mockWriteErr bool
Expand Down Expand Up @@ -285,7 +289,7 @@ const (
// default cache buffer expiration
defaultCacheBufferExpiration = 5 * time.Second
// default sync interval
defaultSyncInterval = 60 * time.Second
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// coalesceMinimum
Expand Down Expand Up @@ -846,7 +850,7 @@ func (fs *fileStore) noTrackSubjects() bool {

// Will init the basics for a message block.
func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}

mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, index))
Expand Down Expand Up @@ -2774,7 +2778,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
}

mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}

// Lock should be held to quiet race detector.
mb.mu.Lock()
Expand Down Expand Up @@ -4403,10 +4407,16 @@ func (mb *msgBlock) pendingWriteSize() int {
if mb == nil {
return 0
}

mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.pendingWriteSizeLocked()
}

// How many bytes pending to be written for this message block.
func (mb *msgBlock) pendingWriteSizeLocked() int {
if mb == nil {
return 0
}
var pending int
if !mb.closed && mb.mfd != nil && mb.cache != nil {
pending = len(mb.cache.buf) - int(mb.cache.wp)
Expand Down Expand Up @@ -4680,27 +4690,49 @@ func (fs *fileStore) syncBlocks() {
fs.mu.RUnlock()

for _, mb := range blks {
// Flush anything that may be pending.
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
}
// Do actual sync. Hold lock for consistency.
mb.mu.Lock()
if !mb.closed {
if mb.closed {
mb.mu.Unlock()
continue
}
if mb.needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
if mb.mfd != nil {
mb.mfd.Sync()
} else {
fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
mb.mu.Unlock()
continue
}
fd.Sync()
fd.Close()
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.needSync = false
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.mu.Unlock()
}

fs.mu.Lock()
fs.syncTmr = time.AfterFunc(fs.fcfg.SyncInterval, fs.syncBlocks)
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
syncAlways := fs.fcfg.SyncAlways
fs.mu.Unlock()

if !syncAlways {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
fd.Close()
}
}
}

// Select the message block where this message should be found.
Expand Down Expand Up @@ -4962,6 +4994,13 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
return fsLostData, mb.werr
}

// Check if we are in sync always mode.
if mb.syncAlways {
mb.mfd.Sync()
} else {
mb.needSync = true
}

// Check for additional writes while we were writing to the disk.
moreBytes := len(mb.cache.buf) - mb.cache.wp - lob

Expand Down Expand Up @@ -6158,9 +6197,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
fs.state.FirstTime = time.Time{}
deleted++
} else {
// Make sure to sync changes.
smb.needSync = true
// Update fs first seq and time.
smb.first.seq = seq - 1 // Just for start condition for selectNextFirst.
smb.selectNextFirst()

fs.state.FirstSeq = smb.first.seq
fs.state.FirstTime = time.Unix(0, smb.first.ts).UTC()

Expand Down Expand Up @@ -6902,7 +6944,9 @@ func (fs *fileStore) writeFullState() error {
}
tmpName := f.Name()
defer os.Remove(tmpName)
_, err = f.Write(buf)
if _, err = f.Write(buf); err == nil && fs.fcfg.SyncAlways {
f.Sync()
}
f.Close()
if err != nil {
return err
Expand Down
37 changes: 37 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5850,3 +5850,40 @@ func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}

func TestFileStoreSyncIntervals(t *testing.T) {
fcfg := FileStoreConfig{StoreDir: t.TempDir(), SyncInterval: 250 * time.Millisecond}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkSyncFlag := func(expected bool) {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.RLock()
syncNeeded := lmb.needSync
lmb.mu.RUnlock()
if syncNeeded != expected {
t.Fatalf("Expected needSync to be %v", expected)
}
}

checkSyncFlag(false)
fs.StoreMsg("Z", nil, []byte("hello"))
checkSyncFlag(true)
time.Sleep(400 * time.Millisecond)
checkSyncFlag(false)
fs.Stop()

// Now check always
fcfg.SyncInterval = 10 * time.Second
fcfg.SyncAlways = true
fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkSyncFlag(false)
fs.StoreMsg("Z", nil, []byte("hello"))
checkSyncFlag(false)
}
29 changes: 19 additions & 10 deletions server/jetstream.go
Expand Up @@ -38,12 +38,14 @@ import (
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
SyncInterval time.Duration `json:"sync_interval,omitempty"`
SyncAlways bool `json:"sync_always,omitempty"`
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
}

// Statistics about JetStream for this server.
Expand Down Expand Up @@ -490,10 +492,12 @@ func (s *Server) updateJetStreamInfoStatus(enabled bool) {
func (s *Server) restartJetStream() error {
opts := s.getOpts()
cfg := JetStreamConfig{
StoreDir: opts.StoreDir,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
Domain: opts.JetStreamDomain,
StoreDir: opts.StoreDir,
SyncInterval: opts.SyncInterval,
SyncAlways: opts.SyncAlways,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
Domain: opts.JetStreamDomain,
}
s.Noticef("Restarting JetStream")
err := s.EnableJetStream(&cfg)
Expand Down Expand Up @@ -2397,6 +2401,10 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je

opts := s.getOpts()

// Sync options.
jsc.SyncInterval = opts.SyncInterval
jsc.SyncAlways = opts.SyncAlways

if opts.maxStoreSet && maxStore >= 0 {
jsc.MaxStore = maxStore
} else {
Expand All @@ -2413,6 +2421,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je
jsc.MaxMemory = JetStreamMaxMemDefault
}
}

return jsc
}

Expand Down
50 changes: 50 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21600,6 +21600,7 @@ func TestJetStreamChangeMaxMessagesPerSubject(t *testing.T) {
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Expand Down Expand Up @@ -21766,3 +21767,52 @@ func TestJetStreamConsumerDefaultsFromStream(t *testing.T) {
}
})
}

func TestJetStreamSyncInterval(t *testing.T) {
sd := t.TempDir()
tmpl := `
listen: 127.0.0.1:-1
jetstream: {
store_dir: %q
%s
}`

for _, test := range []struct {
name string
sync string
expected time.Duration
always bool
}{
{"Default", _EMPTY_, defaultSyncInterval, false},
{"10s", "sync_interval: 10s", time.Duration(10 * time.Second), false},
{"Always", "sync_interval: always", defaultSyncInterval, true},
} {
t.Run(test.name, func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, sd, test.sync)))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

opts := s.getOpts()
require_True(t, opts.SyncInterval == test.expected)

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test.>"},
})
require_NoError(t, err)

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
fs.mu.RLock()
fsSync := fs.fcfg.SyncInterval
syncAlways := fs.fcfg.SyncAlways
fs.mu.RUnlock()
require_True(t, fsSync == test.expected)
require_True(t, syncAlways == test.always)
})
}
}

0 comments on commit bcb45b5

Please sign in to comment.