Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filestore tweaks and improvements #4481

Merged
merged 4 commits into from Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 89 additions & 82 deletions server/filestore.go
Expand Up @@ -433,19 +433,25 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Check if our prior remember a last past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if _, err = fs.newMsgBlockForWrite(); err != nil {
if lmb, err := fs.newMsgBlockForWrite(); err == nil {
lmb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano())
} else {
return nil, err
}
}
// Since we recovered here, make sure to kick ourselves to write out our stream state.
fs.dirty++
defer fs.kickFlushStateLoop()
// Also make sure we get rid of old idx and fss files on return.
defer func() {
}

// Also make sure we get rid of old idx and fss files on return.
// Do this in separate go routine vs inline and at end of processing.
defer func() {
go func() {
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, indexScanAll))
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, fssScanAll))
}()
}
}()

// Lock while do enforcements and removals.
fs.mu.Lock()
Expand All @@ -454,6 +460,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if len(fs.tombs) > 0 {
for _, seq := range fs.tombs {
fs.removeMsg(seq, false, false, false)
fs.removeFromLostData(seq)
}
// Not needed after this phase.
fs.tombs = nil
Expand Down Expand Up @@ -931,7 +938,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)
fs.loadEncryptionForMsgBlock(mb)

// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
Expand All @@ -946,6 +952,10 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
} else {
return nil, err
}

// Make sure encryption loaded if needed.
fs.loadEncryptionForMsgBlock(mb)

// Grab last checksum from main block file.
var lchk [8]byte
if mb.rbytes >= checksumSize {
Expand Down Expand Up @@ -1014,15 +1024,50 @@ func (fs *fileStore) addLostData(ld *LostStreamData) {
return
}
if fs.ld != nil {
fs.ld.Msgs = append(fs.ld.Msgs, ld.Msgs...)
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
var added bool
for _, seq := range ld.Msgs {
if _, found := fs.ld.exists(seq); !found {
fs.ld.Msgs = append(fs.ld.Msgs, seq)
added = true
}
}
if added {
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
}
} else {
fs.ld = ld
}
}

// Helper to see if we already have this sequence reported in our lost data.
func (ld *LostStreamData) exists(seq uint64) (int, bool) {
i, found := sort.Find(len(ld.Msgs), func(i int) int {
tseq := ld.Msgs[i]
if tseq < seq {
return -1
}
if tseq > seq {
return +1
}
return 0
})
return i, found
}

func (fs *fileStore) removeFromLostData(seq uint64) {
if fs.ld == nil {
return
}
if i, found := fs.ld.exists(seq); found {
fs.ld.Msgs = append(fs.ld.Msgs[:i], fs.ld.Msgs[i+1:]...)
if len(fs.ld.Msgs) == 0 {
fs.ld = nil
}
}
}

func (fs *fileStore) rebuildState(ld *LostStreamData) {
fs.mu.Lock()
defer fs.mu.Unlock()
Expand Down Expand Up @@ -1387,48 +1432,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}

// Used when we scan the msg blocks.
type blockFiles struct {
blksSeen map[uint32]struct{}
maxIndex uint32
}

// This will grab all the block files.
func (fs *fileStore) grabMsgBlockFiles(ch chan *blockFiles) {
f, err := os.Open(filepath.Join(fs.fcfg.StoreDir, msgDir))
if err != nil {
ch <- nil
return
}
defer f.Close()

dirs, err := f.ReadDir(-1)
if err != nil {
ch <- nil
return
}

result := &blockFiles{blksSeen: make(map[uint32]struct{})}

for _, fi := range dirs {
var index uint32
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
result.blksSeen[index] = struct{}{}
if index > result.maxIndex {
result.maxIndex = index
}
}
}
ch <- result
}

// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
// Grab all the msgBlock files in parallel in case there are many.
rch := make(chan *blockFiles, 1)
go fs.grabMsgBlockFiles(rch)

fs.mu.Lock()
defer fs.mu.Unlock()

Expand All @@ -1438,6 +1444,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}

// Grab our stream state file and load it in.
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(fn)
Expand Down Expand Up @@ -1586,54 +1593,42 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errCorruptState
}

// Grab the max blk index we see from scanning the directory. The full snapshot has the index that was lmb when
// we created it, so with that and max we know blocks to process. We do this in parallel in casee lots of blks.
blkFiles := <-rch

defer func() {
// Make sure we saw all of our blk files.
for _, mb := range fs.blks {
if _, ok := blkFiles.blksSeen[mb.index]; !ok {
if ld, _, _ := mb.rebuildState(); ld != nil {
// If we have lost data make sure we track here.
fs.addLostData(ld)
rerr = errCorruptState
}
}
}
}()

// Move into place our state, msgBlks and subject info.
fs.state = state

// If our saved state is past what we see on disk, fallback and rebuild.
if blkFiles != nil && blkFiles.maxIndex < blkIndex {
return errPriorState
}

// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
matched = bytes.Equal(mb.lastChecksum(), lchk[:])
if matched && blkIndex == blkFiles.maxIndex {
return nil
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to drop other err here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail to open that means last checksum will fail and we will reprocess in recoverMsgs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK makes sense

// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
return errPriorState
}

if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}

// If we are here we did not match the happy path.
// We need to go through and find our checksum. This should be in blkIndex, but might not be.
start, stop := blkIndex, blkFiles.maxIndex
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
// If we did not match we re-process the last block.
start := blkIndex
if matched {
start++
}

for bi := start; bi <= stop; bi++ {
for bi := start; ; bi++ {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return nil
}
os.Remove(fn)
return err
}
if nmb != nil {
Expand All @@ -1656,8 +1651,6 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.state.Bytes += nmb.bytes
}
}

return nil
}

// adjustAccounting will be called when a stream state was only partially accounted for
Expand Down Expand Up @@ -4237,6 +4230,8 @@ func (fs *fileStore) checkMsgs() *LostStreamData {
fs.psim = make(map[string]*psi)

for _, mb := range fs.blks {
// Make sure encryption loaded if needed for the block.
fs.loadEncryptionForMsgBlock(mb)
// FIXME(dlc) - check tombstones here too?
if ld, _, err := mb.rebuildState(); err != nil && ld != nil {
// Rebuild fs state too.
Expand Down Expand Up @@ -5029,6 +5024,9 @@ func (mb *msgBlock) cacheNotLoaded() bool {
func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
f, err := os.Open(mb.mfn)
if err != nil {
if os.IsNotExist(err) {
err = errNoBlkData
}
return nil, err
}
defer f.Close()
Expand Down Expand Up @@ -5117,6 +5115,12 @@ checkCache:
// We want to hold the mb lock here to avoid any changes to state.
buf, err := mb.loadBlock(nil)
if err != nil {
if err == errNoBlkData {
if ld, _, err := mb.rebuildStateLocked(); err != nil && ld != nil {
// Rebuild fs state too.
go mb.fs.rebuildState(ld)
}
}
return err
}

Expand Down Expand Up @@ -5197,6 +5201,7 @@ var (
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
errUnknownCipher = errors.New("unknown cipher")
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
)

const (
Expand Down Expand Up @@ -6722,7 +6727,6 @@ func (fs *fileStore) Delete() error {
}
return ErrStoreClosed
}
fs.Purge()

pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
Expand All @@ -6731,6 +6735,9 @@ func (fs *fileStore) Delete() error {
os.RemoveAll(pdir)
}

// Do Purge() since if we have lots of blocks uses a mv/rename.
fs.Purge()

if err := fs.Stop(); err != nil {
return err
}
Expand Down