Skip to content

Commit

Permalink
[FIXED] Consumer ack pending > max ack pending on restart or leader c…
Browse files Browse the repository at this point in the history
…hange (#4427)

When a consumer reached a max delivered condition, we did not properly
synchronize the state such that on a restore or leader switch the ack
pending could jump and be higher than max ack pending and block the
consumer.

This propagates a delivered update and we updated the store state engine
to do the right thing when the condition is reached.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 24, 2023
2 parents 5a926f1 + 48bf7ba commit c9b5b32
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 11 deletions.
5 changes: 4 additions & 1 deletion server/consumer.go
Expand Up @@ -3092,7 +3092,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
o.notifyDeliveryExceeded(seq, dc-1)
}
// Make sure to remove from pending.
delete(o.pending, seq)
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
}
continue
}
if seq > 0 {
Expand Down
18 changes: 13 additions & 5 deletions server/filestore.go
Expand Up @@ -6946,20 +6946,28 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
}

if dc > 1 {
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
// Make sure to remove from pending.
delete(o.state.Pending, sseq)
}
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc {
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
o.state.AckFloor.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
}
}
// Make sure we flush to disk.
o.kickFlusher()
Expand Down
119 changes: 119 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -5156,3 +5156,122 @@ func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing
}
}
}

func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

// send 50 msgs
for i := 0; i < 50; i++ {
_, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
}

// File based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("file"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
)
require_NoError(t, err)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

cia, err := js.ConsumerInfo("TEST", "file")
require_NoError(t, err)

// Make sure followers will have exact same state.
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "file"), nil, time.Second)
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "file")

cib, err := js.ConsumerInfo("TEST", "file")
require_NoError(t, err)

// Want to compare sans cluster details which we know will change due to leader change.
// Also last activity for delivered can be slightly off so nil out as well.
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
t.Helper()
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
t.Fatalf("ConsumerInfo do not match\n\t%+v\n\t%+v", a, b)
}
}

checkConsumerInfo(cia, cib)

// Memory based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("mem"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
nats.ConsumerMemoryStorage(),
)
require_NoError(t, err)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

cia, err = js.ConsumerInfo("TEST", "mem")
require_NoError(t, err)

// Make sure followers will have exact same state.
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "mem"), nil, time.Second)
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "mem")

cib, err = js.ConsumerInfo("TEST", "mem")
require_NoError(t, err)

checkConsumerInfo(cia, cib)

// Now file based but R1 and server restart.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("r1"),
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
nats.ConsumerReplicas(1),
)
require_NoError(t, err)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

cia, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)

cl := c.consumerLeader(globalAccountName, "TEST", "r1")
cl.Shutdown()
cl.WaitForShutdown()
cl = c.restartServer(cl)
c.waitOnServerCurrent(cl)

cib, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)

// Created can skew a small bit due to server restart, this is expected.
now := time.Now()
cia.Created, cib.Created = now, now
checkConsumerInfo(cia, cib)
}
21 changes: 16 additions & 5 deletions server/memstore.go
Expand Up @@ -1307,17 +1307,28 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro
}

if dc > 1 {
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
// Make sure to remove from pending.
delete(o.state.Pending, sseq)
}
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
o.state.AckFloor.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
}
}

return nil
Expand Down

0 comments on commit c9b5b32

Please sign in to comment.