Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jetstream.Consumer.Consume() does not respect ConsumerConfig #1588

Open
wood-jp opened this issue Mar 20, 2024 · 1 comment
Open

jetstream.Consumer.Consume() does not respect ConsumerConfig #1588

wood-jp opened this issue Mar 20, 2024 · 1 comment
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@wood-jp
Copy link

wood-jp commented Mar 20, 2024

Observed behavior

The Consume func allows for several options, including PullMaxMessages. However, PullMaxMessages defaults to 500 if not provided as an option here. At the same time, when creating a consumer in the first place, the config allows for setting MaxRequestBatch. If MaxRequestBatch is less than PullMaxMessages then the Consume func will always fail with error nats: Exceeded MaxRequestBatch of X. Additionally, this error is only exposed if passing the optional jetstream.ConsumeErrHandler to expose it - otherwise the consumer will silently fail to consume anything.

Expected behavior

  1. The Consume func should inherit default values (such as PullMaxMessages) from the consumer config, or otherwise not allow this situation to begin with.
  2. The Consume func should return a non-nil error if the options make consuming impossible (such as a consumer with MaxRequestBatch less than the set PullMaxMessages if that's still a option after fixing (1))

Server and client version

github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.33.1

Host environment

No response

Steps to reproduce

A full example requires some fiddling, but here's the short version:

consumer, _ := js.CreateOrUpdateConsumer(
    context.Background(),
    "streamname",
    jetstream.ConsumerConfig{
	MaxRequestBatch: 1,
    }),
)

cc, err := consumer.Consume(
    func(msg jetstream.Msg) {
       fmt.Println("handled a message")
    },
    // handle consumer errors
    jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, handlerErr error) {
       fmt.Println(handlerErr)
    }),
)
// err == nil here but should not be

Results in no handled messages, and outputs the error nats: Exceeded MaxRequestBatch of 1

@wood-jp wood-jp added the defect Suspected defect such as a bug or regression label Mar 20, 2024
@piotrpio
Copy link
Collaborator

Hey @wood-jp, thank's for reporting the issue. You're right, we should be taking consumer config into account on Consume. I think adjusting the default is reasonable, the only risk is that the consumer may be updated asynchronously between when you creating/fetchcing the consumer info and calling Consume (update may change e.g. MaxRequestBatch).

@piotrpio piotrpio self-assigned this Mar 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants