New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fetch in the JetStream Simplified Client should behave similar to the old API. #1590
Comments
Hey @joeledstrom. The new API differs from |
Hi again, Sorry for the delay. This doesn't really work well if you like to emulate the following functionality in the old API. Given a With the old API you can pull between 1 and 100 messages using this code. Where it will block until it has at least one message. How many messages you get depends on how many messages are pending in the consumer. This leads to great batched insertion performance into a database. for {
fetchCtx, _ := context.WithTimeout(ctx, time.Second*30)
msgs, err := sub.Fetch(100, nats.Context(fetchCtx))
if err != nil && errors.Is(err, context.DeadlineExceeded) {
continue
}
if err != nil {
return err
}
fmt.Println("got num messages:", len(msgs))
err = handleBatch(msgs)
if err != nil {
return err
}
// ack all messages or use AckAllPolicy
} Running the above code the output usually looks like this (ie. variable batch sizes, but no delay if 1 message arrives):
New APIThis is the closest way I've found to emulate the old behaviour using the new API: func consumerLoop(ctx context.Context, c jetstream.Consumer) error {
fetchLimit := 100
msgChan := make(chan jetstream.Msg, fetchLimit)
errChan := make(chan error, 1)
mc, err := c.Messages(jetstream.PullMaxMessages(fetchLimit), jetstream.PullExpiry(30*time.Second)) //, jetstream.PullHeartbeat(5*time.Second))
if err != nil {
return err
}
// save first error in errChan
setError := func(err error) {
select {
case errChan <- err:
default:
}
}
context.AfterFunc(ctx, func() {
setError(ctx.Err())
mc.Drain() // causes mc.Next to return ErrMsgIteratorClosed after processing all leftover messages
})
go func() {
defer close(msgChan)
for {
msg, err := mc.Next()
if err != nil {
setError(err)
return
}
msgChan <- msg
}
}()
for {
msgs, ok := readBufferedMessages(msgChan, fetchLimit)
if !ok {
return fmt.Errorf("msgChan closed: %w", <-errChan)
}
err := handleBatch(msgs)
if err != nil {
return err
}
// ack all messages or use AckAllPolicy
}
}
// readBufferedMessages blocks until it gets at least one message (sync channel read)
// then reads buffered messages from the channel up to the limit (async channel reads)
// aborts and returns false if the channel is closed
func readBufferedMessages(msgChan <-chan jetstream.Msg, limit int) ([]jetstream.Msg, bool) {
var msgs []jetstream.Msg
msg, ok := <-msgChan
if !ok {
return nil, false
}
msgs = append(msgs, msg)
for len(msgs) < limit {
select {
case msg, ok := <-msgChan:
if !ok {
return msgs, false
}
msgs = append(msgs, msg)
continue
default:
return msgs, true
}
}
return msgs, true
} Am I missing something? :) |
Thanks for explaining the use case. The reason we don't want to have That said, yours is a valid use case and there is a simple way to implement it using the new API (if I understand it correctly) - a JetStream message contains metadata in subject, allowing you to check
for {
batch, err := cons.Fetch(100, jetstream.FetchMaxWait(5*time.Second))
if err != nil {
return err
}
msgs := make([]jetstream.Msg, 0)
for msg := range batch.Messages() {
msgs = append(msgs, msg)
meta, err := msg.Metadata()
if err != nil {
return err
}
if meta.NumPending == 0 {
err = handleBatch(msgs)
if err != nil {
return err
}
msgs = make([]jetstream.Msg, 0)
}
}
handleBatch(msgs)
}
mc, err := cons.Messages()
if err != nil {
return err
}
msgs := make([]jetstream.Msg, 0, 100)
for {
msg, err := mc.Next()
if err != nil {
// handle error
}
msgs = append(msgs, msg)
meta, err := msg.Metadata()
if err != nil {
return err
}
if meta.NumPending == 0 || len(msgs) == 100 {
err = handleBatch(msgs)
if err != nil {
return err
}
msgs = make([]jetstream.Msg, 0, 100)
}
} These examples are somewhat simplified and skip acks, proper error handling etc, but should demonstrate how to achieve low-latency batching. Also note that unless you have require specific memory/performance optimizations you don't really need to supply configuration options to Hope that helps, let me know if I missed the point or you need any additional info. |
Hey! Is there a proposal to change the following API for Subscription to have similar behavior as well?
Also, could someone help me to understand the difference between using both the above APIs, Ideally they should have the same working right? |
Hello @kohlisid - we will not be changing the behavior of the old API as there are users depending on it and that would be a breaking change. However, there is a // FetchBatch pulls a batch of messages from a stream for a pull consumer.
// Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
// allowing to retrieve incoming messages from a channel.
// The returned channel is always closed after all messages for a batch have been
// delivered by the server - it is safe to iterate over it using range.
//
// To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
// or [nats.Context] (with deadline set).
//
// This method will not return error in case of pull request expiry (even if there are no messages).
// Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) As to the differences between old and new API - yes, there are some, the main goal was not to deliver the same API as the old one has, but rather enable several use cases using pull consumers:
While continuous polling is optimized for throughput, with interleaving pull requests, the goal of |
Proposed change
In the old API as documented here: https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#pull
Is says that
Currently in the new API when using fetch it will always wait until it has gotten the full batch or until
jetstream.FetchMaxWait
.(Could also be that I'm not understanding the new API correctly, and are missing something)
Use case
To be able to use fetch without any delay if there is an uneven traffic, where sometimes you just get less messages than the batch size, but you still want to process them directly.
For my use case we use a larger fetch to optimize write performance to a database that prefers larger batches when available (and then allows much higher insertion performance). But having a delay when only a few messages are available is not really acceptable for this use case.
Contribution
No response
The text was updated successfully, but these errors were encountered: