Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Delete blocks performance #4371

Merged
merged 3 commits into from Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 37 additions & 56 deletions server/filestore.go
Expand Up @@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig {
return fs.fcfg
}

// When we will write a run length encoded record vs adding to the existing avl.SequenceSet.
const rlThresh = 4096
// Read lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) readLockAllMsgBlocks() {
for _, mb := range fs.blks {
mb.mu.RLock()
}
}

// Read unlock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) readUnlockAllMsgBlocks() {
for _, mb := range fs.blks {
mb.mu.RUnlock()
}
}

// Binary encoded state snapshot, >= v2.10 server.
func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
Expand Down Expand Up @@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {

if numDeleted > 0 {
var scratch [4 * 1024]byte

fs.readLockAllMsgBlocks()
defer fs.readUnlockAllMsgBlocks()

for _, db := range fs.deleteBlocks() {
switch db := db.(type) {
case *DeleteRange:
Expand All @@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
return b, nil
}

// Lock should be held.
// We used to be more sophisticated to save memory, but speed is more important.
// All blocks should be at least read locked.
func (fs *fileStore) deleteBlocks() DeleteBlocks {
var (
dbs DeleteBlocks
adm *avl.SequenceSet
prevLast uint64
)
var dbs DeleteBlocks
var prevLast uint64

for _, mb := range fs.blks {
mb.mu.RLock()
// Detect if we have a gap between these blocks.
if prevLast > 0 && prevLast+1 != mb.first.seq {
// Detect if we need to encode a run length encoding here.
if gap := mb.first.seq - prevLast - 1; gap > rlThresh {
// Check if we have a running adm, if so write that out first, or if contigous update rle params.
if min, max, num := adm.State(); num > 0 {
// Check if we are all contingous.
if num == max-min+1 {
prevLast, gap = min-1, mb.first.seq-min
} else {
dbs = append(dbs, adm)
}
// Always nil out here.
adm = nil
}
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
} else {
// Common dmap
if adm == nil {
adm = &avl.SequenceSet{}
adm.SetInitialMin(prevLast + 1)
}
for seq := prevLast + 1; seq < mb.first.seq; seq++ {
adm.Insert(seq)
}
}
gap := mb.first.seq - prevLast - 1
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
}
if min, max, num := mb.dmap.State(); num > 0 {
// Check in case the mb's dmap is contiguous and over our threshold.
if num == max-min+1 && num > rlThresh {
// Need to write out adm if it exists.
if adm != nil && adm.Size() > 0 {
dbs = append(dbs, adm)
adm = nil
}
dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1})
} else {
// Aggregated dmap
if adm == nil {
adm = mb.dmap.Clone()
} else {
adm.Union(&mb.dmap)
}
}
if mb.dmap.Size() > 0 {
dbs = append(dbs, &mb.dmap)
}
prevLast = mb.last.seq
mb.mu.RUnlock()
}

if adm != nil {
dbs = append(dbs, adm)
}

return dbs
}

Expand All @@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
if len(dbs) == 0 {
return
}

fs.mu.Lock()
defer fs.mu.Unlock()

var needsCheck DeleteBlocks

fs.readLockAllMsgBlocks()
mdbs := fs.deleteBlocks()
for i, db := range dbs {
// If the block is same as what we have we can skip.
Expand All @@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
}
}
// Need to insert these.
needsCheck = append(needsCheck, db)
}
fs.readUnlockAllMsgBlocks()

for _, db := range needsCheck {
db.Range(func(dseq uint64) bool {
fs.removeMsg(dseq, false, true, false)
return true
Expand Down
37 changes: 0 additions & 37 deletions server/filestore_test.go
Expand Up @@ -30,7 +30,6 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -5707,39 +5706,3 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}

func TestFileStoreStreamEncoderDecoder(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

const seed = 2222222
prand := rand.New(rand.NewSource(seed))

tick := time.NewTicker(time.Second)
defer tick.Stop()
done := time.NewTimer(10 * time.Second)

msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes

for running := true; running; {
select {
case <-tick.C:
var state StreamState
fs.FastState(&state)
snap, err := fs.EncodedStreamState(0)
require_NoError(t, err)
ss, err := DecodeStreamState(snap)
require_True(t, len(ss.Deleted) > 0)
require_NoError(t, err)
case <-done.C:
running = false
default:
key := strconv.Itoa(prand.Intn(256_000))
fs.StoreMsg(key, nil, msg)
}
}
}
53 changes: 39 additions & 14 deletions server/memstore.go
Expand Up @@ -31,6 +31,7 @@ type memStore struct {
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
ageChk *time.Timer
Expand Down Expand Up @@ -1005,6 +1006,7 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
break
}
}
oldFirst := ms.state.FirstSeq
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
Expand All @@ -1013,6 +1015,17 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}

if oldFirst == ms.state.FirstSeq-1 {
ms.dmap.Delete(oldFirst)
} else {
for seq := oldFirst; seq < ms.state.FirstSeq; seq++ {
ms.dmap.Delete(seq)
}
}
if ms.dmap.IsEmpty() {
ms.dmap.SetInitialMin(ms.state.FirstSeq)
}
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -1071,6 +1084,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
}
ms.state.Bytes -= ss
}
ms.dmap.Insert(seq)
ms.updateFirstSeq(seq)

if secure {
Expand Down Expand Up @@ -1230,29 +1244,30 @@ func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error

// Binary encoded state snapshot, >= v2.10 server.
func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {
// FIXME(dlc) - Don't calculate deleted on the fly, keep delete blocks.
state := ms.State()
ms.mu.RLock()
defer ms.mu.RUnlock()

// Quick calculate num deleted.
numDeleted := int((ms.state.LastSeq - ms.state.FirstSeq + 1) - ms.state.Msgs)
if numDeleted < 0 {
numDeleted = 0
}

// Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks
var buf [1024]byte
buf[0], buf[1] = streamStateMagic, streamStateVersion
n := hdrLen
n += binary.PutUvarint(buf[n:], state.Msgs)
n += binary.PutUvarint(buf[n:], state.Bytes)
n += binary.PutUvarint(buf[n:], state.FirstSeq)
n += binary.PutUvarint(buf[n:], state.LastSeq)
n += binary.PutUvarint(buf[n:], ms.state.Msgs)
n += binary.PutUvarint(buf[n:], ms.state.Bytes)
n += binary.PutUvarint(buf[n:], ms.state.FirstSeq)
n += binary.PutUvarint(buf[n:], ms.state.LastSeq)
n += binary.PutUvarint(buf[n:], failed)
n += binary.PutUvarint(buf[n:], uint64(state.NumDeleted))
n += binary.PutUvarint(buf[n:], uint64(numDeleted))

b := buf[0:n]

if state.NumDeleted > 0 {
var ss avl.SequenceSet
ss.SetInitialMin(state.Deleted[0])
for _, seq := range state.Deleted {
ss.Insert(seq)
}
buf, err := ss.Encode(nil)
if numDeleted > 0 {
buf, err := ms.dmap.Encode(nil)
if err != nil {
return nil, err
}
Expand All @@ -1264,6 +1279,16 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {

// SyncDeleted will make sure this stream has same deleted state as dbs.
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
// For now we share one dmap, so if we have one entry here check if states are the same.
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
if len(dbs) == 1 {
ms.mu.RLock()
min, max, num := ms.dmap.State()
ms.mu.RUnlock()
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
return
}
}
for _, db := range dbs {
db.Range(func(dseq uint64) bool {
ms.RemoveMsg(dseq)
Expand Down
38 changes: 38 additions & 0 deletions server/memstore_test.go
Expand Up @@ -754,3 +754,41 @@ func TestMemStoreInitialFirstSeq(t *testing.T) {
t.Fatalf("Expected last seq 1001, got %d", state.LastSeq)
}
}

func TestMemStoreDeleteBlocks(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{"*"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)

// Put in 10_000 msgs.
total := 10_000
for i := 0; i < total; i++ {
_, _, err := ms.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
}

// Now pick 5k random sequences.
delete := 5000
deleteMap := make(map[int]struct{}, delete)
for len(deleteMap) < delete {
deleteMap[rand.Intn(total)+1] = struct{}{}
}
// Now remove?
for seq := range deleteMap {
ms.RemoveMsg(uint64(seq))
}

var state StreamState
ms.FastState(&state)

// For now we just track via one dmap.
ms.mu.RLock()
dmap := ms.dmap.Clone()
ms.mu.RUnlock()

require_True(t, dmap.Size() == state.NumDeleted)
}