Skip to content

Commit

Permalink
Add in support for segmented binary stream snapshots.
Browse files Browse the repository at this point in the history
Streams with many interior deletes was causing issues due to the fact that the interior deletes were represented as a sorted []uint64.
This approach introduces 3 sub types of delete blocks, avl bitmask tree, a run length encoding, and the legacy format above.
We also take into account large interior deletes such that on receiving a snapshot we can skip things we already know about.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jul 3, 2023
1 parent cf39314 commit 4d7cd26
Show file tree
Hide file tree
Showing 13 changed files with 881 additions and 71 deletions.
6 changes: 3 additions & 3 deletions server/avl/norace_test.go
Expand Up @@ -110,7 +110,7 @@ func TestNoRaceSeqSetEncodeLarge(t *testing.T) {
}

start = time.Now()
ss2, err := Decode(b)
ss2, _, err := Decode(b)
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > expected {
t.Fatalf("Expected decode to take less than %v, got %v", expected, elapsed)
Expand Down Expand Up @@ -174,8 +174,8 @@ func TestNoRaceSeqSetRelativeSpeed(t *testing.T) {
t.Fatalf("Expected SequenceSet insert to be no more than 2x slower (%v vs %v)", mapInsertElapsed, ssInsertElapsed)
}

if mapLookupElapsed*2 <= ssLookupElapsed {
t.Fatalf("Expected SequenceSet lookups to be no more than 2x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed)
if mapLookupElapsed*3 <= ssLookupElapsed {
t.Fatalf("Expected SequenceSet lookups to be no more than 3x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed)
}
}

Expand Down
105 changes: 93 additions & 12 deletions server/avl/seqset.go
Expand Up @@ -138,6 +138,15 @@ func (ss *SequenceSet) Heights() (l, r int) {
return l, r
}

// Returns min, max and number of set items.
func (ss *SequenceSet) State() (min, max, num uint64) {
if ss.root == nil {
return 0, 0, 0
}
min, max = ss.MinMax()
return min, max, uint64(ss.Size())
}

// MinMax will return the minunum and maximum values in the set.
func (ss *SequenceSet) MinMax() (min, max uint64) {
if ss.root == nil {
Expand Down Expand Up @@ -177,6 +186,23 @@ func (ss *SequenceSet) Clone() *SequenceSet {
return css
}

// Union will union this SequenceSet with ssa.
func (ss *SequenceSet) Union(ssa ...*SequenceSet) {
for _, sa := range ssa {
sa.root.nodeIter(func(n *node) {
for nb, b := range n.bits {
for pos := uint64(0); b != 0; pos++ {
if b&1 == 1 {
seq := n.base + (uint64(nb) * uint64(bitsPerBucket)) + pos
ss.Insert(seq)
}
b >>= 1
}
}
})
}
}

// Union will return a union of all sets.
func Union(ssa ...*SequenceSet) *SequenceSet {
if len(ssa) == 0 {
Expand All @@ -200,7 +226,7 @@ const (
// Magic is used to identify the encode binary state..
magic = uint8(22)
// Version
version = uint8(1)
version = uint8(2)
// hdrLen
hdrLen = 2
// minimum length of an encoded SequenceSet.
Expand Down Expand Up @@ -247,30 +273,41 @@ func (ss SequenceSet) Encode(buf []byte) ([]byte, error) {
// ErrBadEncoding is returned when we can not decode properly.
var (
ErrBadEncoding = errors.New("ss: bad encoding")
ErrBadVersion = errors.New("ss: bad version")
ErrSetNotEmpty = errors.New("ss: set not empty")
)

func Decode(buf []byte) (*SequenceSet, error) {
if len(buf) < minLen || buf[0] != magic || buf[1] != version {
return nil, ErrBadEncoding
// Decode returns the sequence set and number of bytes read from the buffer on success.
func Decode(buf []byte) (*SequenceSet, int, error) {
if len(buf) < minLen || buf[0] != magic {
return nil, -1, ErrBadEncoding
}

switch v := buf[1]; v {
case 1:
return decodev1(buf)
case 2:
return decodev2(buf)
default:
return nil, -1, ErrBadVersion
}
}

// Helper to decode v2.
func decodev2(buf []byte) (*SequenceSet, int, error) {
var le = binary.LittleEndian
index := 2
nn := int(le.Uint32(buf[index:]))
sz := int(le.Uint32(buf[index+4:]))
index += 8

expectedLen := minLen + (nn * ((numBuckets+1)*8 + 2))
if len(buf) != expectedLen {
return nil, ErrBadEncoding
if len(buf) < expectedLen {
return nil, -1, ErrBadEncoding
}

nodes := make([]node, nn)
ss, nodes := SequenceSet{size: sz}, make([]node, nn)

ss := SequenceSet{
size: sz,
}
for i := 0; i < nn; i++ {
n := &nodes[i]
n.base = le.Uint64(buf[index:])
Expand All @@ -284,7 +321,51 @@ func Decode(buf []byte) (*SequenceSet, error) {
ss.insertNode(n)
}

return &ss, nil
return &ss, index, nil
}

// Helper to decode v1 into v2 which has fixed buckets of 32 vs 64 originally.
func decodev1(buf []byte) (*SequenceSet, int, error) {
var le = binary.LittleEndian
index := 2
nn := int(le.Uint32(buf[index:]))
sz := int(le.Uint32(buf[index+4:]))
index += 8

const v1NumBuckets = 64

expectedLen := minLen + (nn * ((v1NumBuckets+1)*8 + 2))
if len(buf) < expectedLen {
return nil, -1, ErrBadEncoding
}

var ss SequenceSet
for i := 0; i < nn; i++ {
base := le.Uint64(buf[index:])
index += 8
for nb := uint64(0); nb < v1NumBuckets; nb++ {
n := le.Uint64(buf[index:])
// Walk all set bits and insert sequences manually for this decode from v1.
for pos := uint64(0); n != 0; pos++ {
if n&1 == 1 {
seq := base + (nb * uint64(bitsPerBucket)) + pos
ss.Insert(seq)
}
n >>= 1
}
index += 8
}
// Skip over encoded height.
index += 2
}

// Sanity check.
if ss.Size() != sz {
return nil, -1, ErrBadEncoding
}

return &ss, index, nil

}

// insertNode places a decoded node into the tree.
Expand Down Expand Up @@ -318,7 +399,7 @@ func (ss *SequenceSet) insertNode(n *node) {

const (
bitsPerBucket = 64 // bits in uint64
numBuckets = 64
numBuckets = 32
numEntries = numBuckets * bitsPerBucket
)

Expand Down
47 changes: 46 additions & 1 deletion server/avl/seqset_test.go
Expand Up @@ -14,14 +14,15 @@
package avl

import (
"encoding/base64"
"math/rand"
"testing"
)

func TestSeqSetBasics(t *testing.T) {
var ss SequenceSet

seqs := []uint64{22, 222, 2222, 2, 2, 4}
seqs := []uint64{22, 222, 2000, 2, 2, 4}
for _, seq := range seqs {
ss.Insert(seq)
require_True(t, ss.Exists(seq))
Expand Down Expand Up @@ -235,6 +236,50 @@ func TestSeqSetFirst(t *testing.T) {
}
}

// Test that we can union with nodes vs individual sequence insertion.
func TestSeqSetDistinctUnion(t *testing.T) {
// Distinct sets.
var ss1 SequenceSet
seqs1 := []uint64{1, 10, 100, 200}
for _, seq := range seqs1 {
ss1.Insert(seq)
}

var ss2 SequenceSet
seqs2 := []uint64{5000, 6100, 6200, 6222}
for _, seq := range seqs2 {
ss2.Insert(seq)
}

ss := ss1.Clone()
allSeqs := append(seqs1, seqs2...)

ss.Union(&ss2)
require_True(t, ss.Size() == len(allSeqs))
for _, seq := range allSeqs {
require_True(t, ss.Exists(seq))
}
}

func TestSeqSetDecodeV1(t *testing.T) {
// Encoding from v1 which was 64 buckets.
seqs := []uint64{22, 222, 2222, 222_222, 2_222_222}
encStr := `
FgEDAAAABQAAAABgAwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAADgIQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA==
`

enc, err := base64.StdEncoding.DecodeString(encStr)
require_NoError(t, err)

ss, _, err := Decode(enc)
require_NoError(t, err)

require_True(t, ss.Size() == len(seqs))
for _, seq := range seqs {
require_True(t, ss.Exists(seq))
}
}

func require_NoError(t *testing.T, err error) {
t.Helper()
if err != nil {
Expand Down
99 changes: 76 additions & 23 deletions server/events.go
Expand Up @@ -193,18 +193,53 @@ type ServerID struct {
ID string `json:"id"`
}

// Type for our server capabilities.
type ServerCapability uint64

// ServerInfo identifies remote servers.
type ServerInfo struct {
Name string `json:"name"`
Host string `json:"host"`
ID string `json:"id"`
Cluster string `json:"cluster,omitempty"`
Domain string `json:"domain,omitempty"`
Version string `json:"ver"`
Tags []string `json:"tags,omitempty"`
Seq uint64 `json:"seq"`
JetStream bool `json:"jetstream"`
Time time.Time `json:"time"`
Name string `json:"name"`
Host string `json:"host"`
ID string `json:"id"`
Cluster string `json:"cluster,omitempty"`
Domain string `json:"domain,omitempty"`
Version string `json:"ver"`
Tags []string `json:"tags,omitempty"`
// Whether JetStream is enabled (deprecated in favor of the `ServerCapability`).
JetStream bool `json:"jetstream"`
// Generic capability flags
Flags ServerCapability
// Sequence and Time from the remote server for this message.
Seq uint64 `json:"seq"`
Time time.Time `json:"time"`
}

const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
)

// Set JetStream capability.
func (si *ServerInfo) SetJetStreamEnabled() {
si.Flags |= JetStreamEnabled
// Still set old version.
si.JetStream = true
}

// JetStreamEnabled indicates whether or not we have JetStream enabled.
func (si *ServerInfo) JetStreamEnabled() bool {
// Take into account old version.
return si.Flags&JetStreamEnabled != 0 || si.JetStream
}

// Set binary stream snapshot capability.
func (si *ServerInfo) SetBinaryStreamSnapshot() {
si.Flags |= BinaryStreamSnapshot
}

// JetStreamEnabled indicates whether or not we have binary stream snapshot capbilities.
func (si *ServerInfo) BinaryStreamSnapshot() bool {
return si.Flags&BinaryStreamSnapshot != 0
}

// ClientInfo is detailed information about the client forming a connection.
Expand Down Expand Up @@ -391,17 +426,21 @@ RESET:
case <-sendq.ch:
msgs := sendq.pop()
for _, pm := range msgs {
if pm.si != nil {
pm.si.Name = servername
pm.si.Domain = domain
pm.si.Host = host
pm.si.Cluster = cluster
pm.si.ID = id
pm.si.Seq = atomic.AddUint64(seqp, 1)
pm.si.Version = VERSION
pm.si.Time = time.Now().UTC()
pm.si.JetStream = js
pm.si.Tags = tags
if si := pm.si; si != nil {
si.Name = servername
si.Domain = domain
si.Host = host
si.Cluster = cluster
si.ID = id
si.Seq = atomic.AddUint64(seqp, 1)
si.Version = VERSION
si.Time = time.Now().UTC()
si.Tags = tags
if js {
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
}
}
var b []byte
if pm.msg != nil {
Expand Down Expand Up @@ -1418,7 +1457,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
si.Tags,
cfg,
stats,
false, si.JetStream,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
})
}

Expand Down Expand Up @@ -1454,7 +1495,19 @@ func (s *Server) processNewServer(si *ServerInfo) {
node := getHash(si.Name)
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, si.Tags, nil, nil, false, si.JetStream})
s.nodeToInfo.Store(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
si.Domain,
si.ID,
si.Tags,
nil,
nil,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
})
}
}
// Announce ourselves..
Expand Down

0 comments on commit 4d7cd26

Please sign in to comment.