Skip to content

Commit

Permalink
Improve AckAll performance of first ack with large first sequence for…
Browse files Browse the repository at this point in the history
… the stream.

When the first ack was called on an AckAll consumer attached to a stream with a very high first sequence, the server would become unresponsive for that consumer and potentially the stream due to looping down to the existing ackfloor which in this case would be zero.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed May 17, 2024
1 parent 1bef75f commit 1f0c633
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,7 +2794,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
sagap = sseq - o.asflr
o.adflr, o.asflr = dseq, sseq
for seq := sseq; seq > sseq-sagap; seq-- {
for seq := sseq; seq > sseq-sagap && len(o.pending) > 0; seq-- {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
Expand Down
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8616,7 +8616,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap; seq-- {
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, seq)
Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22498,3 +22498,48 @@ func TestInterestStreamWithFilterSubjectsConsumer(t *testing.T) {
t.Fatalf("expected 2 messages got %d", nfo.State.Msgs)
}
}

func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloor(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
})
require_NoError(t, err)

// Set first sequence to something very big here. This shows the issue with AckAll the
// first time it is called and existing ack floor is 0.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10_000_000_000})
require_NoError(t, err)

// Now add in 100 msgs
for i := 0; i < 100; i++ {
js.Publish("foo.bar", []byte("hello"))
}

ss, err := js.PullSubscribe("foo.*", "TEST", nats.AckAll())
require_NoError(t, err)
msgs, err := ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

start := time.Now()
msgs[9].AckSync()
if elapsed := time.Since(start); elapsed > 250*time.Millisecond {
t.Fatalf("AckSync took too long %v", elapsed)
}

// Make sure next fetch works right away with low timeout.
msgs, err = ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

_, err = js.StreamInfo("TEST", nats.MaxWait(250*time.Millisecond))
require_NoError(t, err)
}

0 comments on commit 1f0c633

Please sign in to comment.