Skip to content

Commit

Permalink
Fix consumer limits (#4567)
Browse files Browse the repository at this point in the history
If Stream has consumer limits set, creating consumer with defaults will
fail in most cases.

Test didn't catch this, as by default, old JS client sets ack policy to
`none`. If the policy is different, it will fail to create consumer with
defaults. We agreed that default Ack Policy should be `explicit`

 Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
neilalexander committed Sep 20, 2023
2 parents 0623e4b + ac2669a commit 496ca98
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
12 changes: 6 additions & 6 deletions server/consumer.go
Expand Up @@ -437,6 +437,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
if len(config.BackOff) > 0 {
config.AckWait = config.BackOff[0]
}
if config.MaxAckPending == 0 {
config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending
}
if config.InactiveThreshold == 0 {
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
accPending := JsDefaultMaxAckPending
Expand All @@ -452,12 +458,6 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 {
config.MaxRequestBatch = lim.MaxRequestBatch
}
if config.MaxAckPending == 0 {
config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending
}
if config.InactiveThreshold == 0 {
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
}
}

// Check the consumer config. If we are recovering don't check filter subjects.
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_test.go
Expand Up @@ -21718,7 +21718,8 @@ func TestJetStreamConsumerDefaultsFromStream(t *testing.T) {

t.Run("InheritDefaultsFromStream", func(t *testing.T) {
ci, err := js.AddConsumer("test", &nats.ConsumerConfig{
Name: "InheritDefaultsFromStream",
Name: "InheritDefaultsFromStream",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

Expand Down

0 comments on commit 496ca98

Please sign in to comment.