Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BackOff helper method to set backoff through subOpts #933

Merged
merged 2 commits into from Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions example_test.go
Expand Up @@ -620,6 +620,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.StartTime(time.Now().Add(-2*time.Hour)))

// Start delivering messages with delay based on BackOff array of time durations.
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This complies, but would actually not "work" because you would need to set the MaxDeliver to at least 2 (or more), but also add nats.ManualAck() because otherwise the callback would auto-ack anyway, so it is unlikely that there would be redeliveries...

So instead (or at least in addition to) having this here, I would update, say TestJetStreamSubscribe_AckPolicy test to use the nats.BackOff() new option. In this test (at the end of it), we use backoff but with the AddConsumer directly.

}

func ExampleMaxWait() {
Expand Down
8 changes: 8 additions & 0 deletions js.go
Expand Up @@ -2237,6 +2237,14 @@ func RateLimit(n uint64) SubOpt {
})
}

// BackOff is an array of time durations that represent the time to delay based on delivery count.
func BackOff(backOff []time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.BackOff = backOff
return nil
})
}

// BindStream binds a consumer to a stream explicitly based on a name.
// When a stream name is not specified, the library uses the subscribe
// subject as a way to find the stream name. It is done by making a request
Expand Down