Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance and latency with large number of sparse consumers. #3706

Merged
merged 1 commit into from Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions server/consumer.go
Expand Up @@ -306,6 +306,9 @@ type consumer struct {

// Ack queue
ackMsgs *ipQueue

// For stream signaling.
sigSub *subscription
}

type proposal struct {
Expand Down Expand Up @@ -1062,6 +1065,9 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)

Expand Down Expand Up @@ -1107,6 +1113,11 @@ func (o *consumer) setLeader(isLeader bool) {
stopAndClearTimer(&o.gwdtmr)
}
o.mu.Unlock()

// Unregister as a leader with our parent stream.
if mset != nil {
mset.removeConsumerAsLeader(o)
}
}
}

Expand Down Expand Up @@ -4311,3 +4322,42 @@ func (o *consumer) account() *Account {
o.mu.RUnlock()
return a
}

func (o *consumer) signalSub() *subscription {
o.mu.Lock()
defer o.mu.Unlock()

if o.sigSub != nil {
return o.sigSub
}

subject := o.cfg.FilterSubject
if subject == _EMPTY_ {
subject = fwcs
}
return &subscription{subject: []byte(subject), icb: o.processStreamSignal}
}

// This is what will be called when our parent stream wants to kick us regarding a new message.
// We know that we are the leader and that this subject matches us by how the parent handles registering
// us with the signaling sublist.
// We do need the sequence of the message however and we use the msg as the encoded seq.
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) {
var le = binary.LittleEndian
seq := le.Uint64(seqb)

o.mu.Lock()
defer o.mu.Unlock()
if o.mset == nil {
return
}
if seq > o.npcm {
o.npc++
}
if seq < o.sseq {
return
}
if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() {
o.signalNewMessages()
}
}
177 changes: 161 additions & 16 deletions server/norace_test.go
Expand Up @@ -5785,6 +5785,26 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T
}
}

// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
require_NoError(t, err)
}
}

// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)
Expand Down Expand Up @@ -5880,22 +5900,147 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
}

// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
// Performance impact on large number of consumers but sparse delivery.
func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
skip(t)

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"ID.*"},
})
require_NoError(t, err)

// Now add in ~10k consumers on different subjects.
for i := 3; i <= 10_000; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
FilterSubject: fmt.Sprintf("ID.%d", i),
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)
}

toSend := 100_000

// Bind a consumer to ID.2.
var received int
done := make(chan bool)

nc, js = jsClientConnect(t, s)
defer nc.Close()

mh := func(m *nats.Msg) {
received++
if received >= toSend {
close(done)
}
}
_, err = js.Subscribe("ID.2", mh)
require_NoError(t, err)

last := make(chan bool)
_, err = js.Subscribe("ID.1", func(_ *nats.Msg) { close(last) })
require_NoError(t, err)

nc, _ = jsClientConnect(t, s)
defer nc.Close()
js, err = nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024))
require_NoError(t, err)

start := time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("ID.2", []byte("ok"))
}
// Check latency for this one message.
// This will show the issue better than throughput which can bypass signal processing.
js.PublishAsync("ID.1", []byte("ok"))

select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Fatalf("Failed to receive all messages: %d of %d\n", received, toSend)
}

tt := time.Since(start)
fmt.Printf("Took %v to receive %d msgs\n", tt, toSend)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("%.0f msgs/s\n", float64(toSend)/tt.Seconds())
derekcollison marked this conversation as resolved.
Show resolved Hide resolved

select {
case <-last:
break
case <-time.After(30 * time.Second):
t.Fatalf("Failed to receive last message\n")
}
lt := time.Since(start)

fmt.Printf("Took %v to receive last msg\n", lt)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
}

func TestNoRaceJetStreamEndToEndLatency(t *testing.T) {
// skip(t)

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

nc, js = jsClientConnect(t, s)
defer nc.Close()

var sent time.Time
next := make(chan struct{})

var (
total time.Duration
min time.Duration
max time.Duration
)
mh := func(m *nats.Msg) {
received := time.Now()
tt := received.Sub(sent)
if min == 0 || tt < min {
min = tt
}
if max == 0 || tt > max {
max = tt
}
total += tt
next <- struct{}{}
}
_, err = js.Subscribe("foo", mh)
require_NoError(t, err)

nc, js = jsClientConnect(t, s)
defer nc.Close()

toSend := 100_000
for i := 0; i < toSend; i++ {
sent = time.Now()
js.PublishAsync("foo", []byte("ok"))
<-next
}

fmt.Printf("AVG: %v\nMIN: %v\nMAX: %v\n", total/time.Duration(toSend), min, max)
Jarema marked this conversation as resolved.
Show resolved Hide resolved
}