Skip to content

Commit

Permalink
Add in support for segmented binary stream snapshots. (#4284)
Browse files Browse the repository at this point in the history
Streams with many interior deletes was causing issues due to the fact
that the interior deletes were represented as a sorted []uint64. This
could cause snapshots for streams R>1 and with lots of interior deletes
to take up more memory and cpu then we want.

This new approach introduces 3 sub types of delete blocks, an avl
bitmask tree, a run length encoding, and the legacy format above. We
also take into account large interior deletes such that on receiving a
snapshot we can skip things we already know about.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jul 5, 2023
2 parents cf39314 + 4d7cd26 commit a393653
Show file tree
Hide file tree
Showing 13 changed files with 881 additions and 71 deletions.
6 changes: 3 additions & 3 deletions server/avl/norace_test.go
Expand Up @@ -110,7 +110,7 @@ func TestNoRaceSeqSetEncodeLarge(t *testing.T) {
}

start = time.Now()
ss2, err := Decode(b)
ss2, _, err := Decode(b)
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > expected {
t.Fatalf("Expected decode to take less than %v, got %v", expected, elapsed)
Expand Down Expand Up @@ -174,8 +174,8 @@ func TestNoRaceSeqSetRelativeSpeed(t *testing.T) {
t.Fatalf("Expected SequenceSet insert to be no more than 2x slower (%v vs %v)", mapInsertElapsed, ssInsertElapsed)
}

if mapLookupElapsed*2 <= ssLookupElapsed {
t.Fatalf("Expected SequenceSet lookups to be no more than 2x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed)
if mapLookupElapsed*3 <= ssLookupElapsed {
t.Fatalf("Expected SequenceSet lookups to be no more than 3x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed)
}
}

Expand Down
105 changes: 93 additions & 12 deletions server/avl/seqset.go
Expand Up @@ -138,6 +138,15 @@ func (ss *SequenceSet) Heights() (l, r int) {
return l, r
}

// Returns min, max and number of set items.
func (ss *SequenceSet) State() (min, max, num uint64) {
if ss.root == nil {
return 0, 0, 0
}
min, max = ss.MinMax()
return min, max, uint64(ss.Size())
}

// MinMax will return the minunum and maximum values in the set.
func (ss *SequenceSet) MinMax() (min, max uint64) {
if ss.root == nil {
Expand Down Expand Up @@ -177,6 +186,23 @@ func (ss *SequenceSet) Clone() *SequenceSet {
return css
}

// Union will union this SequenceSet with ssa.
func (ss *SequenceSet) Union(ssa ...*SequenceSet) {
for _, sa := range ssa {
sa.root.nodeIter(func(n *node) {
for nb, b := range n.bits {
for pos := uint64(0); b != 0; pos++ {
if b&1 == 1 {
seq := n.base + (uint64(nb) * uint64(bitsPerBucket)) + pos
ss.Insert(seq)
}
b >>= 1
}
}
})
}
}

// Union will return a union of all sets.
func Union(ssa ...*SequenceSet) *SequenceSet {
if len(ssa) == 0 {
Expand All @@ -200,7 +226,7 @@ const (
// Magic is used to identify the encode binary state..
magic = uint8(22)
// Version
version = uint8(1)
version = uint8(2)
// hdrLen
hdrLen = 2
// minimum length of an encoded SequenceSet.
Expand Down Expand Up @@ -247,30 +273,41 @@ func (ss SequenceSet) Encode(buf []byte) ([]byte, error) {
// ErrBadEncoding is returned when we can not decode properly.
var (
ErrBadEncoding = errors.New("ss: bad encoding")
ErrBadVersion = errors.New("ss: bad version")
ErrSetNotEmpty = errors.New("ss: set not empty")
)

func Decode(buf []byte) (*SequenceSet, error) {
if len(buf) < minLen || buf[0] != magic || buf[1] != version {
return nil, ErrBadEncoding
// Decode returns the sequence set and number of bytes read from the buffer on success.
func Decode(buf []byte) (*SequenceSet, int, error) {
if len(buf) < minLen || buf[0] != magic {
return nil, -1, ErrBadEncoding
}

switch v := buf[1]; v {
case 1:
return decodev1(buf)
case 2:
return decodev2(buf)
default:
return nil, -1, ErrBadVersion
}
}

// Helper to decode v2.
func decodev2(buf []byte) (*SequenceSet, int, error) {
var le = binary.LittleEndian
index := 2
nn := int(le.Uint32(buf[index:]))
sz := int(le.Uint32(buf[index+4:]))
index += 8

expectedLen := minLen + (nn * ((numBuckets+1)*8 + 2))
if len(buf) != expectedLen {
return nil, ErrBadEncoding
if len(buf) < expectedLen {
return nil, -1, ErrBadEncoding
}

nodes := make([]node, nn)
ss, nodes := SequenceSet{size: sz}, make([]node, nn)

ss := SequenceSet{
size: sz,
}
for i := 0; i < nn; i++ {
n := &nodes[i]
n.base = le.Uint64(buf[index:])
Expand All @@ -284,7 +321,51 @@ func Decode(buf []byte) (*SequenceSet, error) {
ss.insertNode(n)
}

return &ss, nil
return &ss, index, nil
}

// Helper to decode v1 into v2 which has fixed buckets of 32 vs 64 originally.
func decodev1(buf []byte) (*SequenceSet, int, error) {
var le = binary.LittleEndian
index := 2
nn := int(le.Uint32(buf[index:]))
sz := int(le.Uint32(buf[index+4:]))
index += 8

const v1NumBuckets = 64

expectedLen := minLen + (nn * ((v1NumBuckets+1)*8 + 2))
if len(buf) < expectedLen {
return nil, -1, ErrBadEncoding
}

var ss SequenceSet
for i := 0; i < nn; i++ {
base := le.Uint64(buf[index:])
index += 8
for nb := uint64(0); nb < v1NumBuckets; nb++ {
n := le.Uint64(buf[index:])
// Walk all set bits and insert sequences manually for this decode from v1.
for pos := uint64(0); n != 0; pos++ {
if n&1 == 1 {
seq := base + (nb * uint64(bitsPerBucket)) + pos
ss.Insert(seq)
}
n >>= 1
}
index += 8
}
// Skip over encoded height.
index += 2
}

// Sanity check.
if ss.Size() != sz {
return nil, -1, ErrBadEncoding
}

return &ss, index, nil

}

// insertNode places a decoded node into the tree.
Expand Down Expand Up @@ -318,7 +399,7 @@ func (ss *SequenceSet) insertNode(n *node) {

const (
bitsPerBucket = 64 // bits in uint64
numBuckets = 64
numBuckets = 32
numEntries = numBuckets * bitsPerBucket
)

Expand Down
47 changes: 46 additions & 1 deletion server/avl/seqset_test.go
Expand Up @@ -14,14 +14,15 @@
package avl

import (
"encoding/base64"
"math/rand"
"testing"
)

func TestSeqSetBasics(t *testing.T) {
var ss SequenceSet

seqs := []uint64{22, 222, 2222, 2, 2, 4}
seqs := []uint64{22, 222, 2000, 2, 2, 4}
for _, seq := range seqs {
ss.Insert(seq)
require_True(t, ss.Exists(seq))
Expand Down Expand Up @@ -235,6 +236,50 @@ func TestSeqSetFirst(t *testing.T) {
}
}

// Test that we can union with nodes vs individual sequence insertion.
func TestSeqSetDistinctUnion(t *testing.T) {
// Distinct sets.
var ss1 SequenceSet
seqs1 := []uint64{1, 10, 100, 200}
for _, seq := range seqs1 {
ss1.Insert(seq)
}

var ss2 SequenceSet
seqs2 := []uint64{5000, 6100, 6200, 6222}
for _, seq := range seqs2 {
ss2.Insert(seq)
}

ss := ss1.Clone()
allSeqs := append(seqs1, seqs2...)

ss.Union(&ss2)
require_True(t, ss.Size() == len(allSeqs))
for _, seq := range allSeqs {
require_True(t, ss.Exists(seq))
}
}

func TestSeqSetDecodeV1(t *testing.T) {
// Encoding from v1 which was 64 buckets.
seqs := []uint64{22, 222, 2222, 222_222, 2_222_222}
encStr := `
FgEDAAAABQAAAABgAwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAADgIQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA==
`

enc, err := base64.StdEncoding.DecodeString(encStr)
require_NoError(t, err)

ss, _, err := Decode(enc)
require_NoError(t, err)

require_True(t, ss.Size() == len(seqs))
for _, seq := range seqs {
require_True(t, ss.Exists(seq))
}
}

func require_NoError(t *testing.T, err error) {
t.Helper()
if err != nil {
Expand Down
99 changes: 76 additions & 23 deletions server/events.go
Expand Up @@ -193,18 +193,53 @@ type ServerID struct {
ID string `json:"id"`
}

// Type for our server capabilities.
type ServerCapability uint64

// ServerInfo identifies remote servers.
type ServerInfo struct {
Name string `json:"name"`
Host string `json:"host"`
ID string `json:"id"`
Cluster string `json:"cluster,omitempty"`
Domain string `json:"domain,omitempty"`
Version string `json:"ver"`
Tags []string `json:"tags,omitempty"`
Seq uint64 `json:"seq"`
JetStream bool `json:"jetstream"`
Time time.Time `json:"time"`
Name string `json:"name"`
Host string `json:"host"`
ID string `json:"id"`
Cluster string `json:"cluster,omitempty"`
Domain string `json:"domain,omitempty"`
Version string `json:"ver"`
Tags []string `json:"tags,omitempty"`
// Whether JetStream is enabled (deprecated in favor of the `ServerCapability`).
JetStream bool `json:"jetstream"`
// Generic capability flags
Flags ServerCapability
// Sequence and Time from the remote server for this message.
Seq uint64 `json:"seq"`
Time time.Time `json:"time"`
}

const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
)

// Set JetStream capability.
func (si *ServerInfo) SetJetStreamEnabled() {
si.Flags |= JetStreamEnabled
// Still set old version.
si.JetStream = true
}

// JetStreamEnabled indicates whether or not we have JetStream enabled.
func (si *ServerInfo) JetStreamEnabled() bool {
// Take into account old version.
return si.Flags&JetStreamEnabled != 0 || si.JetStream
}

// Set binary stream snapshot capability.
func (si *ServerInfo) SetBinaryStreamSnapshot() {
si.Flags |= BinaryStreamSnapshot
}

// JetStreamEnabled indicates whether or not we have binary stream snapshot capbilities.
func (si *ServerInfo) BinaryStreamSnapshot() bool {
return si.Flags&BinaryStreamSnapshot != 0
}

// ClientInfo is detailed information about the client forming a connection.
Expand Down Expand Up @@ -391,17 +426,21 @@ RESET:
case <-sendq.ch:
msgs := sendq.pop()
for _, pm := range msgs {
if pm.si != nil {
pm.si.Name = servername
pm.si.Domain = domain
pm.si.Host = host
pm.si.Cluster = cluster
pm.si.ID = id
pm.si.Seq = atomic.AddUint64(seqp, 1)
pm.si.Version = VERSION
pm.si.Time = time.Now().UTC()
pm.si.JetStream = js
pm.si.Tags = tags
if si := pm.si; si != nil {
si.Name = servername
si.Domain = domain
si.Host = host
si.Cluster = cluster
si.ID = id
si.Seq = atomic.AddUint64(seqp, 1)
si.Version = VERSION
si.Time = time.Now().UTC()
si.Tags = tags
if js {
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
}
}
var b []byte
if pm.msg != nil {
Expand Down Expand Up @@ -1418,7 +1457,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
si.Tags,
cfg,
stats,
false, si.JetStream,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
})
}

Expand Down Expand Up @@ -1454,7 +1495,19 @@ func (s *Server) processNewServer(si *ServerInfo) {
node := getHash(si.Name)
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, si.Tags, nil, nil, false, si.JetStream})
s.nodeToInfo.Store(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
si.Domain,
si.ID,
si.Tags,
nil,
nil,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
})
}
}
// Announce ourselves..
Expand Down

0 comments on commit a393653

Please sign in to comment.