Skip to content

Commit

Permalink
[IMPROVED] Consumer with AckAll performance of first ack with large f…
Browse files Browse the repository at this point in the history
…irst stream sequence. (#5446)

When the first ack was called on an AckAll consumer attached to a stream
with a very high first sequence, the server would become unresponsive
for that consumer and potentially the stream due to looping down to the
existing ackfloor which in this case would be zero.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 17, 2024
2 parents 6ee937f + 04e4f4e commit 61e5a7c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2886,7 +2886,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
sagap = sseq - o.asflr
o.adflr, o.asflr = dseq, sseq
for seq := sseq; seq > sseq-sagap; seq-- {
for seq := sseq; seq > sseq-sagap && len(o.pending) > 0; seq-- {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
Expand Down
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8729,7 +8729,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap; seq-- {
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, seq)
Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23503,3 +23503,48 @@ func TestInterestStreamWithFilterSubjectsConsumer(t *testing.T) {
t.Fatalf("expected 2 messages got %d", nfo.State.Msgs)
}
}

func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloor(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
})
require_NoError(t, err)

// Set first sequence to something very big here. This shows the issue with AckAll the
// first time it is called and existing ack floor is 0.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10_000_000_000})
require_NoError(t, err)

// Now add in 100 msgs
for i := 0; i < 100; i++ {
js.Publish("foo.bar", []byte("hello"))
}

ss, err := js.PullSubscribe("foo.*", "TEST", nats.AckAll())
require_NoError(t, err)
msgs, err := ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

start := time.Now()
msgs[9].AckSync()
if elapsed := time.Since(start); elapsed > 250*time.Millisecond {
t.Fatalf("AckSync took too long %v", elapsed)
}

// Make sure next fetch works right away with low timeout.
msgs, err = ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

_, err = js.StreamInfo("TEST", nats.MaxWait(250*time.Millisecond))
require_NoError(t, err)
}

0 comments on commit 61e5a7c

Please sign in to comment.