Skip to content

Commit

Permalink
Use encoding of avl seqset for writeIndexInfo's delete map.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 18, 2023
1 parent 1a24e95 commit 333d684
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 33 deletions.
73 changes: 40 additions & 33 deletions server/filestore.go
Expand Up @@ -224,6 +224,8 @@ const (
magic = uint8(22)
// Version
version = uint8(1)
// New IndexInfo Version
newVersion = uint8(2)
// hdrLen
hdrLen = 2
// This is where we keep the streams.
Expand Down Expand Up @@ -689,9 +691,6 @@ const (
emptyRecordLen = msgHdrSize + checksumSize
)

// This is the max room needed for index header.
const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize

// Lock should be held.
func (fs *fileStore) noTrackSubjects() bool {
return !(len(fs.psim) > 0 || len(fs.cfg.Subjects) > 0 || fs.cfg.Mirror != nil || len(fs.cfg.Sources) > 0)
Expand Down Expand Up @@ -968,7 +967,7 @@ func (mb *msgBlock) convertToEncrypted() error {
return err
}
if buf, err = os.ReadFile(mb.ifn); err == nil && len(buf) > 0 {
if err := checkHeader(buf); err != nil {
if err := checkNewHeader(buf); err != nil {
return err
}
buf = mb.aek.Seal(buf[:0], mb.nonce, buf, nil)
Expand Down Expand Up @@ -5024,11 +5023,12 @@ func (mb *msgBlock) writeIndexInfo() error {
// Filestore lock and mb lock should be held.
func (mb *msgBlock) writeIndexInfoLocked() error {
// HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum
var hdr [indexHdrSize]byte
// Make large enough to hold almost all possible maximum interior delete scenarios.
var hdr [42 * 1024]byte

// Write header
hdr[0] = magic
hdr[1] = version
hdr[1] = newVersion

n := hdrLen
n += binary.PutUvarint(hdr[n:], mb.msgs)
Expand All @@ -5042,7 +5042,16 @@ func (mb *msgBlock) writeIndexInfoLocked() error {

// Append a delete map if needed
if !mb.dmap.IsEmpty() {
buf = append(buf, mb.genDeleteMap()...)
// Always attempt to tack it onto end.
dmap, err := mb.dmap.Encode(hdr[len(buf):])
if err != nil {
return err
}
if len(dmap) < cap(hdr)-len(buf) {
buf = hdr[:len(buf)+len(dmap)]
} else {
buf = append(buf, dmap...)
}
}

// Open our FD if needed.
Expand Down Expand Up @@ -5083,6 +5092,14 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
return err
}

func checkNewHeader(hdr []byte) error {
if hdr == nil || len(hdr) < 2 || hdr[0] != magic ||
(hdr[1] != version && hdr[1] != newVersion) {
return errCorruptState
}
return nil
}

// readIndexInfo will read in the index information for the message block.
func (mb *msgBlock) readIndexInfo() error {
buf, err := os.ReadFile(mb.ifn)
Expand All @@ -5103,7 +5120,7 @@ func (mb *msgBlock) readIndexInfo() error {
}
}

if err := checkHeader(buf); err != nil {
if err := checkNewHeader(buf); err != nil {
defer os.Remove(mb.ifn)
return fmt.Errorf("bad index file")
}
Expand Down Expand Up @@ -5162,38 +5179,28 @@ func (mb *msgBlock) readIndexInfo() error {

// Now check for presence of a delete map
if dmapLen > 0 {
for i := 0; i < int(dmapLen); i++ {
seq := readSeq()
if seq == 0 {
break
// New version is encoded avl seqset.
if buf[1] == newVersion {
dmap, err := avl.Decode(buf[bi:])
if err != nil {
return fmt.Errorf("could not decode avl dmap: %v", err)
}
mb.dmap = *dmap
} else {
// This is the old version.
for i := 0; i < int(dmapLen); i++ {
seq := readSeq()
if seq == 0 {
break
}
mb.dmap.Insert(seq + mb.first.seq)
}
mb.dmap.Insert(seq + mb.first.seq)
}
}

return nil
}

func (mb *msgBlock) genDeleteMap() []byte {
if mb.dmap.IsEmpty() {
return nil
}
buf := make([]byte, mb.dmap.Size()*binary.MaxVarintLen64)
// We use first seq as an offset to cut down on size.
fseq, n := uint64(mb.first.seq), 0

mb.dmap.Range(func(seq uint64) bool {
// This is for lazy cleanup as the first sequence moves up.
if seq < fseq {
mb.dmap.Delete(seq)
} else {
n += binary.PutUvarint(buf[n:], seq-fseq)
}
return true
})
return buf[:n]
}

func syncAndClose(mfd, ifd *os.File) {
if mfd != nil {
mfd.Sync()
Expand Down
57 changes: 57 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5400,3 +5400,60 @@ func TestFileStoreSubjectsTotals(t *testing.T) {
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
}
}

func TestFileStoreNewWriteIndexInfo(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = defaultLargeBlockSize

fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Fill a block.
numToFill := 254200
for i := 0; i < numToFill; i++ {
_, _, err := fs.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
}

// Maximize interior deletes for testing the new AVL sequence set.
for seq := uint64(2); seq < uint64(numToFill); seq++ {
removed, err := fs.RemoveMsg(seq)
require_NoError(t, err)
require_True(t, removed)
}
// Grab first block
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()

mb.mu.Lock()
start := time.Now()
require_NoError(t, mb.writeIndexInfoLocked())
elapsed := time.Since(start)
require_True(t, elapsed < time.Millisecond)
fi, err := os.Stat(mb.ifn)
mb.mu.Unlock()

require_NoError(t, err)
require_True(t, fi.Size() < 34*1024) // Just over 32k

mb.mu.Lock()
mb.dmap.Empty()
err = mb.readIndexInfo()
numMsgs := mb.msgs
firstSeq := mb.first.seq
lastSeq := mb.last.seq
mb.mu.Unlock()
// Make sure consistent.
require_NoError(t, err)
require_True(t, numMsgs == 2)
require_True(t, firstSeq == 1)
require_True(t, lastSeq == uint64(numToFill))

fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
})
}

0 comments on commit 333d684

Please sign in to comment.