Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch in the JetStream Simplified Client should behave similar to the old API. #1590

Open
joeledstrom opened this issue Mar 21, 2024 · 5 comments
Labels
proposal Enhancement idea or proposal

Comments

@joeledstrom
Copy link

joeledstrom commented Mar 21, 2024

Proposed change

In the old API as documented here: https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#pull

Is says that

Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.

Currently in the new API when using fetch it will always wait until it has gotten the full batch or until jetstream.FetchMaxWait.

(Could also be that I'm not understanding the new API correctly, and are missing something)

Use case

To be able to use fetch without any delay if there is an uneven traffic, where sometimes you just get less messages than the batch size, but you still want to process them directly.

For my use case we use a larger fetch to optimize write performance to a database that prefers larger batches when available (and then allows much higher insertion performance). But having a delay when only a few messages are available is not really acceptable for this use case.

Contribution

No response

@joeledstrom joeledstrom added the proposal Enhancement idea or proposal label Mar 21, 2024
@piotrpio
Copy link
Collaborator

Hey @joeledstrom. The new API differs from Fetch() in that it's non blocking - so you get the response from method right after request is published and you can wait for incoming messages on a Messages() channel. The messages will be send on this channel as soon as client receives them. So while the method waits for full batch or timeout before closing the channel, you'll have access to the messages immediately.

@joeledstrom
Copy link
Author

joeledstrom commented Apr 9, 2024

Hi again,

Sorry for the delay.

This doesn't really work well if you like to emulate the following functionality in the old API. Given a handleBatch function that expects 1 to 100 messages, that will be processed in a batch. In my use case, the batch of messages will be inserted into a database that is can insert faster if inserted in a large batch. (But if just one message is received it should still be inserted immediately).

With the old API you can pull between 1 and 100 messages using this code. Where it will block until it has at least one message. How many messages you get depends on how many messages are pending in the consumer. This leads to great batched insertion performance into a database.

for {
	fetchCtx, _ := context.WithTimeout(ctx, time.Second*30)
	msgs, err := sub.Fetch(100, nats.Context(fetchCtx))
	if err != nil && errors.Is(err, context.DeadlineExceeded) {
		continue
	}
	if err != nil {
		return err
	}
	fmt.Println("got num messages:", len(msgs))
	err = handleBatch(msgs)
	if err != nil {
		return err
	}
	// ack all messages or use AckAllPolicy
}

Running the above code the output usually looks like this (ie. variable batch sizes, but no delay if 1 message arrives):

got num messages: 3
got num messages: 100
got num messages: 100
got num messages: 1
got num messages: 17
got num messages: 100

New API

This is the closest way I've found to emulate the old behaviour using the new API:

func consumerLoop(ctx context.Context, c jetstream.Consumer) error {
	fetchLimit := 100
	msgChan := make(chan jetstream.Msg, fetchLimit)
	errChan := make(chan error, 1)

	mc, err := c.Messages(jetstream.PullMaxMessages(fetchLimit), jetstream.PullExpiry(30*time.Second)) //, jetstream.PullHeartbeat(5*time.Second))
	if err != nil {
		return err
	}

	// save first error in errChan
	setError := func(err error) {
		select {
		case errChan <- err:
		default:
		}
	}

	context.AfterFunc(ctx, func() {
		setError(ctx.Err())
		mc.Drain() // causes mc.Next to return ErrMsgIteratorClosed after processing all leftover messages
	})

	go func() {
		defer close(msgChan)
		for {
			msg, err := mc.Next()
			if err != nil {
				setError(err)
				return
			}
			msgChan <- msg
		}
	}()

	for {
		msgs, ok := readBufferedMessages(msgChan, fetchLimit)
		if !ok {
			return fmt.Errorf("msgChan closed: %w", <-errChan)
		}
		err := handleBatch(msgs)
		if err != nil {
			return err
		}
		// ack all messages or use AckAllPolicy
	}
}

// readBufferedMessages blocks until it gets at least one message (sync channel read)
// then reads buffered messages from the channel up to the limit (async channel reads)
// aborts and returns false if the channel is closed
func readBufferedMessages(msgChan <-chan jetstream.Msg, limit int) ([]jetstream.Msg, bool) {
	var msgs []jetstream.Msg
	msg, ok := <-msgChan
	if !ok {
		return nil, false
	}
	msgs = append(msgs, msg)
	for len(msgs) < limit {
		select {
		case msg, ok := <-msgChan:
			if !ok {
				return msgs, false
			}
			msgs = append(msgs, msg)
			continue
		default:
			return msgs, true
		}
	}
	return msgs, true
}

Am I missing something? :)

@piotrpio
Copy link
Collaborator

Thanks for explaining the use case. The reason we don't want to have Fetch() behaving in a similar fashion to the old API is simple - if publisher is slow, especially just a bit slower than the consumer (which is a very frequent case), and you are using the old Fetch API, you'll end up almost always returning just one message and sending fetch again, hurting the performance and adding more stress to the server.

That said, yours is a valid use case and there is a simple way to implement it using the new API (if I understand it correctly) - a JetStream message contains metadata in subject, allowing you to check NumPending which is the number of messages pending to be delivered to the consumer. If you want to get the currently available messages and batch them, you could simply check if NumPending = 0. That should work fine with both Fetch() and Messages():

  • using Fetch():
for {
		batch, err := cons.Fetch(100, jetstream.FetchMaxWait(5*time.Second))
		if err != nil {
			return err
		}

		msgs := make([]jetstream.Msg, 0)
		for msg := range batch.Messages() {
			msgs = append(msgs, msg)
			meta, err := msg.Metadata()
			if err != nil {
				return err
			}
			if meta.NumPending == 0 {
				err = handleBatch(msgs)
				if err != nil {
					return err
				}
				msgs = make([]jetstream.Msg, 0)
			}
		}
		handleBatch(msgs)
	}
  • or even simpler, using Messages():
	mc, err := cons.Messages()
	if err != nil {
		return err
	}

	msgs := make([]jetstream.Msg, 0, 100)
	for {
		msg, err := mc.Next()
		if err != nil {
			// handle error
		}
		msgs = append(msgs, msg)
		meta, err := msg.Metadata()
		if err != nil {
			return err
		}
		if meta.NumPending == 0 || len(msgs) == 100 {
			err = handleBatch(msgs)
			if err != nil {
				return err
			}
			msgs = make([]jetstream.Msg, 0, 100)
		}
	}

These examples are somewhat simplified and skip acks, proper error handling etc, but should demonstrate how to achieve low-latency batching. Also note that unless you have require specific memory/performance optimizations you don't really need to supply configuration options to Messages() (especially PullMaxMessages and PullExpiry) - client will optimize batch sizes and send overlapping pull requests to the server to avoid slowdowns between subsequent fetches.

Hope that helps, let me know if I missed the point or you need any additional info.

@kohlisid
Copy link

kohlisid commented Apr 17, 2024

Hey! Is there a proposal to change the following API for Subscription to have similar behavior as well?

// Fetch pulls a batch of messages from a stream for a pull consumer.
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// Fetch sends a single request to retrieve given number of messages.
// It will wait up to provided expiry time if not all messages are available.
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {

Also, could someone help me to understand the difference between using both the above APIs, Ideally they should have the same working right?

@piotrpio

@piotrpio
Copy link
Collaborator

Hello @kohlisid - we will not be changing the behavior of the old API as there are users depending on it and that would be a breaking change. However, there is a FetchBatch method which works very similarly to the new API:

// FetchBatch pulls a batch of messages from a stream for a pull consumer.
// Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
// allowing to retrieve incoming messages from a channel.
// The returned channel is always closed after all messages for a batch have been
// delivered by the server - it is safe to iterate over it using range.
//
// To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
// or [nats.Context] (with deadline set).
//
// This method will not return error in case of pull request expiry (even if there are no messages).
// Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)

As to the differences between old and new API - yes, there are some, the main goal was not to deliver the same API as the old one has, but rather enable several use cases using pull consumers:

  • continuous push-like message polling - Consume and Messages methods
  • one-off pull request - Fetch, FetchBytes and FetchNoWait

While continuous polling is optimized for throughput, with interleaving pull requests, the goal of Fetch is simplicity and ease of control - thus we did not want to do overcomplicate it with sending multiple pull requests etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

3 participants