Skip to content

Commit

Permalink
[IMPROVED] Delete blocks performance (#4371)
Browse files Browse the repository at this point in the history
Track deleted with single avl.SeqSet dmap for now vs old method for
memory store.

For fileStore, we were trying to be too smart to save space at the
expense of encoding time, so revert back to simple version that is much
100x faster.
 
Size of encoding may be a bit bigger then we wanted, but we want to
prefer speed over size.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 7, 2023
2 parents abe0791 + 75e1171 commit 6ca7887
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 109 deletions.
93 changes: 37 additions & 56 deletions server/filestore.go
Expand Up @@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig {
return fs.fcfg
}

// When we will write a run length encoded record vs adding to the existing avl.SequenceSet.
const rlThresh = 4096
// Read lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) readLockAllMsgBlocks() {
for _, mb := range fs.blks {
mb.mu.RLock()
}
}

// Read unlock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) readUnlockAllMsgBlocks() {
for _, mb := range fs.blks {
mb.mu.RUnlock()
}
}

// Binary encoded state snapshot, >= v2.10 server.
func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
Expand Down Expand Up @@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {

if numDeleted > 0 {
var scratch [4 * 1024]byte

fs.readLockAllMsgBlocks()
defer fs.readUnlockAllMsgBlocks()

for _, db := range fs.deleteBlocks() {
switch db := db.(type) {
case *DeleteRange:
Expand All @@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
return b, nil
}

// Lock should be held.
// We used to be more sophisticated to save memory, but speed is more important.
// All blocks should be at least read locked.
func (fs *fileStore) deleteBlocks() DeleteBlocks {
var (
dbs DeleteBlocks
adm *avl.SequenceSet
prevLast uint64
)
var dbs DeleteBlocks
var prevLast uint64

for _, mb := range fs.blks {
mb.mu.RLock()
// Detect if we have a gap between these blocks.
if prevLast > 0 && prevLast+1 != mb.first.seq {
// Detect if we need to encode a run length encoding here.
if gap := mb.first.seq - prevLast - 1; gap > rlThresh {
// Check if we have a running adm, if so write that out first, or if contigous update rle params.
if min, max, num := adm.State(); num > 0 {
// Check if we are all contingous.
if num == max-min+1 {
prevLast, gap = min-1, mb.first.seq-min
} else {
dbs = append(dbs, adm)
}
// Always nil out here.
adm = nil
}
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
} else {
// Common dmap
if adm == nil {
adm = &avl.SequenceSet{}
adm.SetInitialMin(prevLast + 1)
}
for seq := prevLast + 1; seq < mb.first.seq; seq++ {
adm.Insert(seq)
}
}
gap := mb.first.seq - prevLast - 1
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
}
if min, max, num := mb.dmap.State(); num > 0 {
// Check in case the mb's dmap is contiguous and over our threshold.
if num == max-min+1 && num > rlThresh {
// Need to write out adm if it exists.
if adm != nil && adm.Size() > 0 {
dbs = append(dbs, adm)
adm = nil
}
dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1})
} else {
// Aggregated dmap
if adm == nil {
adm = mb.dmap.Clone()
} else {
adm.Union(&mb.dmap)
}
}
if mb.dmap.Size() > 0 {
dbs = append(dbs, &mb.dmap)
}
prevLast = mb.last.seq
mb.mu.RUnlock()
}

if adm != nil {
dbs = append(dbs, adm)
}

return dbs
}

Expand All @@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
if len(dbs) == 0 {
return
}

fs.mu.Lock()
defer fs.mu.Unlock()

var needsCheck DeleteBlocks

fs.readLockAllMsgBlocks()
mdbs := fs.deleteBlocks()
for i, db := range dbs {
// If the block is same as what we have we can skip.
Expand All @@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
}
}
// Need to insert these.
needsCheck = append(needsCheck, db)
}
fs.readUnlockAllMsgBlocks()

for _, db := range needsCheck {
db.Range(func(dseq uint64) bool {
fs.removeMsg(dseq, false, true, false)
return true
Expand Down
37 changes: 0 additions & 37 deletions server/filestore_test.go
Expand Up @@ -30,7 +30,6 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -5707,39 +5706,3 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}

func TestFileStoreStreamEncoderDecoder(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

const seed = 2222222
prand := rand.New(rand.NewSource(seed))

tick := time.NewTicker(time.Second)
defer tick.Stop()
done := time.NewTimer(10 * time.Second)

msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes

for running := true; running; {
select {
case <-tick.C:
var state StreamState
fs.FastState(&state)
snap, err := fs.EncodedStreamState(0)
require_NoError(t, err)
ss, err := DecodeStreamState(snap)
require_True(t, len(ss.Deleted) > 0)
require_NoError(t, err)
case <-done.C:
running = false
default:
key := strconv.Itoa(prand.Intn(256_000))
fs.StoreMsg(key, nil, msg)
}
}
}
53 changes: 39 additions & 14 deletions server/memstore.go
Expand Up @@ -31,6 +31,7 @@ type memStore struct {
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
ageChk *time.Timer
Expand Down Expand Up @@ -1005,6 +1006,7 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
break
}
}
oldFirst := ms.state.FirstSeq
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
Expand All @@ -1013,6 +1015,17 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}

if oldFirst == ms.state.FirstSeq-1 {
ms.dmap.Delete(oldFirst)
} else {
for seq := oldFirst; seq < ms.state.FirstSeq; seq++ {
ms.dmap.Delete(seq)
}
}
if ms.dmap.IsEmpty() {
ms.dmap.SetInitialMin(ms.state.FirstSeq)
}
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -1071,6 +1084,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
}
ms.state.Bytes -= ss
}
ms.dmap.Insert(seq)
ms.updateFirstSeq(seq)

if secure {
Expand Down Expand Up @@ -1230,29 +1244,30 @@ func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error

// Binary encoded state snapshot, >= v2.10 server.
func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {
// FIXME(dlc) - Don't calculate deleted on the fly, keep delete blocks.
state := ms.State()
ms.mu.RLock()
defer ms.mu.RUnlock()

// Quick calculate num deleted.
numDeleted := int((ms.state.LastSeq - ms.state.FirstSeq + 1) - ms.state.Msgs)
if numDeleted < 0 {
numDeleted = 0
}

// Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks
var buf [1024]byte
buf[0], buf[1] = streamStateMagic, streamStateVersion
n := hdrLen
n += binary.PutUvarint(buf[n:], state.Msgs)
n += binary.PutUvarint(buf[n:], state.Bytes)
n += binary.PutUvarint(buf[n:], state.FirstSeq)
n += binary.PutUvarint(buf[n:], state.LastSeq)
n += binary.PutUvarint(buf[n:], ms.state.Msgs)
n += binary.PutUvarint(buf[n:], ms.state.Bytes)
n += binary.PutUvarint(buf[n:], ms.state.FirstSeq)
n += binary.PutUvarint(buf[n:], ms.state.LastSeq)
n += binary.PutUvarint(buf[n:], failed)
n += binary.PutUvarint(buf[n:], uint64(state.NumDeleted))
n += binary.PutUvarint(buf[n:], uint64(numDeleted))

b := buf[0:n]

if state.NumDeleted > 0 {
var ss avl.SequenceSet
ss.SetInitialMin(state.Deleted[0])
for _, seq := range state.Deleted {
ss.Insert(seq)
}
buf, err := ss.Encode(nil)
if numDeleted > 0 {
buf, err := ms.dmap.Encode(nil)
if err != nil {
return nil, err
}
Expand All @@ -1264,6 +1279,16 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {

// SyncDeleted will make sure this stream has same deleted state as dbs.
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
// For now we share one dmap, so if we have one entry here check if states are the same.
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
if len(dbs) == 1 {
ms.mu.RLock()
min, max, num := ms.dmap.State()
ms.mu.RUnlock()
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
return
}
}
for _, db := range dbs {
db.Range(func(dseq uint64) bool {
ms.RemoveMsg(dseq)
Expand Down
38 changes: 38 additions & 0 deletions server/memstore_test.go
Expand Up @@ -754,3 +754,41 @@ func TestMemStoreInitialFirstSeq(t *testing.T) {
t.Fatalf("Expected last seq 1001, got %d", state.LastSeq)
}
}

func TestMemStoreDeleteBlocks(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{"*"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)

// Put in 10_000 msgs.
total := 10_000
for i := 0; i < total; i++ {
_, _, err := ms.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
}

// Now pick 5k random sequences.
delete := 5000
deleteMap := make(map[int]struct{}, delete)
for len(deleteMap) < delete {
deleteMap[rand.Intn(total)+1] = struct{}{}
}
// Now remove?
for seq := range deleteMap {
ms.RemoveMsg(uint64(seq))
}

var state StreamState
ms.FastState(&state)

// For now we just track via one dmap.
ms.mu.RLock()
dmap := ms.dmap.Clone()
ms.mu.RUnlock()

require_True(t, dmap.Size() == state.NumDeleted)
}

0 comments on commit 6ca7887

Please sign in to comment.