Skip to content

Commit

Permalink
Filestore tweaks and improvements (#4481)
Browse files Browse the repository at this point in the history
Optimized for startup restore time.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 4, 2023
2 parents e11ddb8 + 1768f9c commit 0e78f18
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 209 deletions.
171 changes: 89 additions & 82 deletions server/filestore.go
Expand Up @@ -433,19 +433,25 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Check if our prior remember a last past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if _, err = fs.newMsgBlockForWrite(); err != nil {
if lmb, err := fs.newMsgBlockForWrite(); err == nil {
lmb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano())
} else {
return nil, err
}
}
// Since we recovered here, make sure to kick ourselves to write out our stream state.
fs.dirty++
defer fs.kickFlushStateLoop()
// Also make sure we get rid of old idx and fss files on return.
defer func() {
}

// Also make sure we get rid of old idx and fss files on return.
// Do this in separate go routine vs inline and at end of processing.
defer func() {
go func() {
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, indexScanAll))
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, fssScanAll))
}()
}
}()

// Lock while do enforcements and removals.
fs.mu.Lock()
Expand All @@ -454,6 +460,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if len(fs.tombs) > 0 {
for _, seq := range fs.tombs {
fs.removeMsg(seq, false, false, false)
fs.removeFromLostData(seq)
}
// Not needed after this phase.
fs.tombs = nil
Expand Down Expand Up @@ -931,7 +938,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)
fs.loadEncryptionForMsgBlock(mb)

// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
Expand All @@ -946,6 +952,10 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
} else {
return nil, err
}

// Make sure encryption loaded if needed.
fs.loadEncryptionForMsgBlock(mb)

// Grab last checksum from main block file.
var lchk [8]byte
if mb.rbytes >= checksumSize {
Expand Down Expand Up @@ -1014,15 +1024,50 @@ func (fs *fileStore) addLostData(ld *LostStreamData) {
return
}
if fs.ld != nil {
fs.ld.Msgs = append(fs.ld.Msgs, ld.Msgs...)
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
var added bool
for _, seq := range ld.Msgs {
if _, found := fs.ld.exists(seq); !found {
fs.ld.Msgs = append(fs.ld.Msgs, seq)
added = true
}
}
if added {
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
}
} else {
fs.ld = ld
}
}

// Helper to see if we already have this sequence reported in our lost data.
func (ld *LostStreamData) exists(seq uint64) (int, bool) {
i, found := sort.Find(len(ld.Msgs), func(i int) int {
tseq := ld.Msgs[i]
if tseq < seq {
return -1
}
if tseq > seq {
return +1
}
return 0
})
return i, found
}

func (fs *fileStore) removeFromLostData(seq uint64) {
if fs.ld == nil {
return
}
if i, found := fs.ld.exists(seq); found {
fs.ld.Msgs = append(fs.ld.Msgs[:i], fs.ld.Msgs[i+1:]...)
if len(fs.ld.Msgs) == 0 {
fs.ld = nil
}
}
}

func (fs *fileStore) rebuildState(ld *LostStreamData) {
fs.mu.Lock()
defer fs.mu.Unlock()
Expand Down Expand Up @@ -1387,48 +1432,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}

// Used when we scan the msg blocks.
type blockFiles struct {
blksSeen map[uint32]struct{}
maxIndex uint32
}

// This will grab all the block files.
func (fs *fileStore) grabMsgBlockFiles(ch chan *blockFiles) {
f, err := os.Open(filepath.Join(fs.fcfg.StoreDir, msgDir))
if err != nil {
ch <- nil
return
}
defer f.Close()

dirs, err := f.ReadDir(-1)
if err != nil {
ch <- nil
return
}

result := &blockFiles{blksSeen: make(map[uint32]struct{})}

for _, fi := range dirs {
var index uint32
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
result.blksSeen[index] = struct{}{}
if index > result.maxIndex {
result.maxIndex = index
}
}
}
ch <- result
}

// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
// Grab all the msgBlock files in parallel in case there are many.
rch := make(chan *blockFiles, 1)
go fs.grabMsgBlockFiles(rch)

fs.mu.Lock()
defer fs.mu.Unlock()

Expand All @@ -1438,6 +1444,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}

// Grab our stream state file and load it in.
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(fn)
Expand Down Expand Up @@ -1586,54 +1593,42 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errCorruptState
}

// Grab the max blk index we see from scanning the directory. The full snapshot has the index that was lmb when
// we created it, so with that and max we know blocks to process. We do this in parallel in casee lots of blks.
blkFiles := <-rch

defer func() {
// Make sure we saw all of our blk files.
for _, mb := range fs.blks {
if _, ok := blkFiles.blksSeen[mb.index]; !ok {
if ld, _, _ := mb.rebuildState(); ld != nil {
// If we have lost data make sure we track here.
fs.addLostData(ld)
rerr = errCorruptState
}
}
}
}()

// Move into place our state, msgBlks and subject info.
fs.state = state

// If our saved state is past what we see on disk, fallback and rebuild.
if blkFiles != nil && blkFiles.maxIndex < blkIndex {
return errPriorState
}

// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
matched = bytes.Equal(mb.lastChecksum(), lchk[:])
if matched && blkIndex == blkFiles.maxIndex {
return nil
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
return errPriorState
}

if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}

// If we are here we did not match the happy path.
// We need to go through and find our checksum. This should be in blkIndex, but might not be.
start, stop := blkIndex, blkFiles.maxIndex
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
// If we did not match we re-process the last block.
start := blkIndex
if matched {
start++
}

for bi := start; bi <= stop; bi++ {
for bi := start; ; bi++ {
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return nil
}
os.Remove(fn)
return err
}
if nmb != nil {
Expand All @@ -1656,8 +1651,6 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.state.Bytes += nmb.bytes
}
}

return nil
}

// adjustAccounting will be called when a stream state was only partially accounted for
Expand Down Expand Up @@ -4237,6 +4230,8 @@ func (fs *fileStore) checkMsgs() *LostStreamData {
fs.psim = make(map[string]*psi)

for _, mb := range fs.blks {
// Make sure encryption loaded if needed for the block.
fs.loadEncryptionForMsgBlock(mb)
// FIXME(dlc) - check tombstones here too?
if ld, _, err := mb.rebuildState(); err != nil && ld != nil {
// Rebuild fs state too.
Expand Down Expand Up @@ -5029,6 +5024,9 @@ func (mb *msgBlock) cacheNotLoaded() bool {
func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
f, err := os.Open(mb.mfn)
if err != nil {
if os.IsNotExist(err) {
err = errNoBlkData
}
return nil, err
}
defer f.Close()
Expand Down Expand Up @@ -5117,6 +5115,12 @@ checkCache:
// We want to hold the mb lock here to avoid any changes to state.
buf, err := mb.loadBlock(nil)
if err != nil {
if err == errNoBlkData {
if ld, _, err := mb.rebuildStateLocked(); err != nil && ld != nil {
// Rebuild fs state too.
go mb.fs.rebuildState(ld)
}
}
return err
}

Expand Down Expand Up @@ -5197,6 +5201,7 @@ var (
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
errUnknownCipher = errors.New("unknown cipher")
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
)

const (
Expand Down Expand Up @@ -6722,7 +6727,6 @@ func (fs *fileStore) Delete() error {
}
return ErrStoreClosed
}
fs.Purge()

pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
Expand All @@ -6731,6 +6735,9 @@ func (fs *fileStore) Delete() error {
os.RemoveAll(pdir)
}

// Do Purge() since if we have lots of blocks uses a mv/rename.
fs.Purge()

if err := fs.Stop(); err != nil {
return err
}
Expand Down

0 comments on commit 0e78f18

Please sign in to comment.