Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Durable pull consumers could get cleaned up incorrectly on leader change. #4412

Merged
merged 1 commit into from Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 9 additions & 10 deletions server/consumer.go
Expand Up @@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
// Pull consumer. We run the dtmr all the time for this one.
stopAndClearTimer(&o.dtmr)
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
Expand Down Expand Up @@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) {
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)

}

// If we are R>1 spin up our proposal loop.
Expand All @@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) {
close(o.qch)
o.qch = nil
}
// Make sure to clear out any re delivery queues
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.pending = nil
Expand All @@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Reset waiting if we are in pull mode.
if o.isPullMode() {
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
Expand Down Expand Up @@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
// If we do not have interest anymore and have a delete threshold set, then set
// a timer to delete us. We wait for a bit in case of server reconnect.
if !interest && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
return true
}
return false
Expand All @@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
} else {
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive)
}
o.mu.Unlock()
return
Expand All @@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
} else {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
o.mu.Unlock()
return
Expand Down Expand Up @@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
stopAndClearTimer(&o.dtmr)
// Restart timer only if we are the leader.
if o.isLeader() && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}

Expand Down
46 changes: 46 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -5026,3 +5026,49 @@ func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
require_NotEqual(t, info.Cluster.Leader, "")
require_Equal(t, len(info.Cluster.Replicas), 2)
}

func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

// Queue a msg.
sendStreamMsg(t, nc, "foo", "ok")

thresh := 250 * time.Millisecond

// This will start the timer.
sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh))
require_NoError(t, err)

// Switch over leader.
cl := c.consumerLeader(globalAccountName, "TEST", "dlc")
cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc")
c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")

// Create activity on this consumer.
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)

// This is consider activity as well. So we can watch now up to thresh to make sure consumer still active.
msgs[0].AckSync()

// The consumer should not disappear for next `thresh` interval unless old leader does so.
timeout := time.Now().Add(thresh)
for time.Now().Before(timeout) {
_, err := js.ConsumerInfo("TEST", "dlc")
if err == nats.ErrConsumerNotFound {
t.Fatalf("Consumer deleted when it should not have been")
}
}
}