Skip to content

Commit

Permalink
[FIXED] Durable pull consumers could get cleaned up incorrectly on le…
Browse files Browse the repository at this point in the history
…ader change. (#4412)

Fix for a bug that would allow old leaders of pull based durables to
delete a consumer from an inactivity threshold timer inadvertently.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 21, 2023
2 parents 6e3ae20 + 43314fd commit 2fc3f45
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 10 deletions.
19 changes: 9 additions & 10 deletions server/consumer.go
Expand Up @@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
// Pull consumer. We run the dtmr all the time for this one.
stopAndClearTimer(&o.dtmr)
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
Expand Down Expand Up @@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) {
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)

}

// If we are R>1 spin up our proposal loop.
Expand All @@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) {
close(o.qch)
o.qch = nil
}
// Make sure to clear out any re delivery queues
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.pending = nil
Expand All @@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Reset waiting if we are in pull mode.
if o.isPullMode() {
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
Expand Down Expand Up @@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
// If we do not have interest anymore and have a delete threshold set, then set
// a timer to delete us. We wait for a bit in case of server reconnect.
if !interest && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
return true
}
return false
Expand All @@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
} else {
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive)
}
o.mu.Unlock()
return
Expand All @@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
} else {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
o.mu.Unlock()
return
Expand Down Expand Up @@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
stopAndClearTimer(&o.dtmr)
// Restart timer only if we are the leader.
if o.isLeader() && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}

Expand Down
46 changes: 46 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -5110,3 +5110,49 @@ func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
require_NotEqual(t, info.Cluster.Leader, "")
require_Equal(t, len(info.Cluster.Replicas), 2)
}

func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

// Queue a msg.
sendStreamMsg(t, nc, "foo", "ok")

thresh := 250 * time.Millisecond

// This will start the timer.
sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh))
require_NoError(t, err)

// Switch over leader.
cl := c.consumerLeader(globalAccountName, "TEST", "dlc")
cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc")
c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")

// Create activity on this consumer.
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)

// This is consider activity as well. So we can watch now up to thresh to make sure consumer still active.
msgs[0].AckSync()

// The consumer should not disappear for next `thresh` interval unless old leader does so.
timeout := time.Now().Add(thresh)
for time.Now().Before(timeout) {
_, err := js.ConsumerInfo("TEST", "dlc")
if err == nats.ErrConsumerNotFound {
t.Fatalf("Consumer deleted when it should not have been")
}
}
}

0 comments on commit 2fc3f45

Please sign in to comment.