Skip to content

Commit

Permalink
Improve performance and latency with large number of sparse consumers.
Browse files Browse the repository at this point in the history
When a stream had a large number of consumers on a server that were sparse, the signaling mechanism would do a linear scan to signal matching consumers. As usage patterns have continued to have more consumers that are filteres and sparse, meaning a message is destined for a single or small number of consumers.

This change moves selection to a sublist that tracks only active consumer leaders for selection, which optimizes selection of consumers to signal when the number of consumers is large.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Dec 11, 2022
1 parent ef32cba commit 32d7458
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 57 deletions.
50 changes: 50 additions & 0 deletions server/consumer.go
Expand Up @@ -306,6 +306,9 @@ type consumer struct {

// Ack queue
ackMsgs *ipQueue

// For stream signaling.
sigSub *subscription
}

type proposal struct {
Expand Down Expand Up @@ -1062,6 +1065,9 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)

Expand Down Expand Up @@ -1107,6 +1113,11 @@ func (o *consumer) setLeader(isLeader bool) {
stopAndClearTimer(&o.gwdtmr)
}
o.mu.Unlock()

// Unregister as a leader with our parent stream.
if mset != nil {
mset.removeConsumerAsLeader(o)
}
}
}

Expand Down Expand Up @@ -4311,3 +4322,42 @@ func (o *consumer) account() *Account {
o.mu.RUnlock()
return a
}

func (o *consumer) signalSub() *subscription {
o.mu.Lock()
defer o.mu.Unlock()

if o.sigSub != nil {
return o.sigSub
}

subject := o.cfg.FilterSubject
if subject == _EMPTY_ {
subject = fwcs
}
return &subscription{subject: []byte(subject), icb: o.processStreamSignal}
}

// This is what will be called when our parent stream wants to kick us regarding a new message.
// We know that we are the leader and that this subject matches us by how the parent handles registering
// us with the signaling sublist.
// We do need the sequence of the message however and we use the msg as the encoded seq.
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) {
var le = binary.LittleEndian
seq := le.Uint64(seqb)

o.mu.Lock()
defer o.mu.Unlock()
if o.mset == nil {
return
}
if seq > o.npcm {
o.npc++
}
if seq < o.sseq {
return
}
if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() {
o.signalNewMessages()
}
}
177 changes: 161 additions & 16 deletions server/norace_test.go
Expand Up @@ -5785,6 +5785,26 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T
}
}

// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
require_NoError(t, err)
}
}

// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)
Expand Down Expand Up @@ -5880,22 +5900,147 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}

// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
// Performance impact on large number of consumers but sparse delivery.
func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"ID.*"},
})
require_NoError(t, err)

// Now add in ~10k consumers on different subjects.
for i := 3; i <= 10_000; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
FilterSubject: fmt.Sprintf("ID.%d", i),
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)
}

toSend := 100_000

// Bind a consumer to ID.2.
var received int
done := make(chan bool)

nc, js = jsClientConnect(t, s)
defer nc.Close()

mh := func(m *nats.Msg) {
received++
if received >= toSend {
close(done)
}
}
_, err = js.Subscribe("ID.2", mh)
require_NoError(t, err)

last := make(chan bool)
_, err = js.Subscribe("ID.1", func(_ *nats.Msg) { close(last) })
require_NoError(t, err)

nc, _ = jsClientConnect(t, s)
defer nc.Close()
js, err = nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024))
require_NoError(t, err)

start := time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("ID.2", []byte("ok"))
}
// Check latency for this one message.
// This will show the issue better than throughput which can bypass signal processing.
js.PublishAsync("ID.1", []byte("ok"))

select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Fatalf("Failed to receive all messages: %d of %d\n", received, toSend)
}

tt := time.Since(start)
fmt.Printf("Took %v to receive %d msgs\n", tt, toSend)
fmt.Printf("%.0f msgs/s\n", float64(toSend)/tt.Seconds())

select {
case <-last:
break
case <-time.After(30 * time.Second):
t.Fatalf("Failed to receive last message\n")
}
lt := time.Since(start)

fmt.Printf("Took %v to receive last msg\n", lt)
}

func TestJetStreamEndToEndLatency(t *testing.T) {
// skip(t)

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

nc, js = jsClientConnect(t, s)
defer nc.Close()

var sent time.Time
next := make(chan struct{})

var (
total time.Duration
min time.Duration
max time.Duration
)
mh := func(m *nats.Msg) {
received := time.Now()
tt := received.Sub(sent)
if min == 0 || tt < min {
min = tt
}
if max == 0 || tt > max {
max = tt
}
total += tt
next <- struct{}{}
}
_, err = js.Subscribe("foo", mh)
require_NoError(t, err)

nc, js = jsClientConnect(t, s)
defer nc.Close()

toSend := 100_000
for i := 0; i < toSend; i++ {
sent = time.Now()
js.PublishAsync("foo", []byte("ok"))
<-next
}

fmt.Printf("AVG: %v\nMIN: %v\nMAX: %v\n", total/time.Duration(toSend), min, max)
}

0 comments on commit 32d7458

Please sign in to comment.