Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[watermill-kafka] - cannot process events concurrently without ACKing the event #374

Open
nachogiljaldo opened this issue Jul 26, 2023 · 8 comments · May be fixed by ThreeDotsLabs/watermill-kafka#29

Comments

@nachogiljaldo
Copy link

When using watermill-kafka's subscriber, only one event can be processed at the time, unless you ACK the event. That, disregard the number of partitions you may have.
If you ACK the event, the offset is marked as done, and therefore the event.

The consequence is that if you want to ensure at least once processing you can process only one event at the time.

I might be missing something, but I don't think I am, else I would expect this test to pass:

Publish 20 events in 8 partitions and expect to process at least 2 events in 15s without ACK

func TestConcurrentProcessingDifferentPartitions(t *testing.T) {
	pub, sub := createPartitionedPubSub(t)
	topicName := "topic_" + watermill.NewUUID()

	var messagesToPublish []*message.Message

	for i := 0; i < 20; i++ {
		id := watermill.NewUUID()
		messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil))
	}
	err := pub.Publish(topicName, messagesToPublish...)
	require.NoError(t, err, "cannot publish message")

	messages, err := sub.Subscribe(context.Background(), topicName)
	require.NoError(t, err)

	var receivedMessages []*message.Message
	go func() {
		for {
			select {
			case msg, ok := <-messages:
				if ok {
					receivedMessages = append(receivedMessages, msg)
				}
			}
		}
	}()

	time.Sleep(15 * time.Second)
	require.GreaterOrEqual(t, len(receivedMessages), 2)
}

My expectation would be one of these (and ideally both configurable):

  • I can process concurrently N messages where N is the amount of partitions assigned to the consumer
  • I can get as many as I ask for (with a max buffer size) and I need to ACK / NACK them explicitly

Am I missing something, is it currently possible?

@sarkarshuvojit
Copy link

sarkarshuvojit commented Sep 19, 2023

A single kafka consumer is created for a Subscribe call.

Your assumption is half-correct. You can process N messages parallely if you have N partitions assigned to N consumers. Even if you have N amount of partitions, if you have only one kafka consumer then it may consume only one message at a time, there also might be some additional overhead for the consumer to switch partitions in between.

You would need N consumers in the same consumer group so that each consumer can bind to each partition and read from them parallelly. If you have fewer consumers and more partitions then your consumers will be doing a lot of partition switching, and if you have more consumers than partitions then some of your consumers will be idle, not receiving any messages at all.

Since the implementation creates a single consumer, it will get messages one by one and not in parallel as you expect.

@nachogiljaldo
Copy link
Author

Yes, that part is clearly understood, but the subscriber works with any arbitrary number of partitions, it gets and uses those assigned to it by the broker, but it consumes one concurrently, disregard the number of partitions it has assigned.

I understand a workaround would be creating N subscribers, but that seems to put a lot of load in the consumer:

  • you need to know the number of partitions
  • the number of partitions might change while the consumers run

I wonder if it would make sense to add some configuration to add concurrency policies:

  • one (current behavior)
  • one per assigned partition
  • N from any partitions to rely on the consumer of apply concurrency checks if any

@sarkarshuvojit
Copy link

sarkarshuvojit commented Sep 19, 2023

Correct, I was not suggesting creating 3 Subscriptions but rather 3 lightweight consumers. I don't think that can currently be done.

Also just curious as you mention the number of partitions changing while the consumer runs. In your use-case, do the partition count change frequently?

A solution may be implementing a configuration, where internally one Subscribe call will create N kafka consumers and then fans in the messages to a single go channel.

@nachogiljaldo
Copy link
Author

Also just curious as you mention the number of partitions changing while the consumer runs. In your use-case, do the partition count change frequently?

Not really, just saying it could happen :)

A solution may be implementing a configuration, where internally one Subscribe call will create N kafka consumers and then fans in the messages to a single go channel.

I will try go ahead and create a PR for that in watermill/kafka in the next few weeks (need to have a few days of "clear schedule"), is that just the way to go?

@sarkarshuvojit
Copy link

is that just the way to go?

According to my limited knowledge, yes.

I will try go ahead and create a PR for that in watermill/kafka in the next few weeks

Let me know if you need a hand, I'd be happy to help.

@OneCricketeer
Copy link

Looking at your code, I see no consumer group provided. Therefore the consumer assignment protocol is used, and acking a record doesn't really do anything useful

@nachogiljaldo
Copy link
Author

nachogiljaldo commented Sep 30, 2023

Looking at your code, I see no consumer group provided. Therefore the consumer assignment protocol is used, and acking a record doesn't really do anything useful

It was just an example to illustrate the problem, but you're right, because there's no consumer group, the ACK does not result on offset changes. However, without the ACK no further events are received, so it does indeed do something useful :)

@nachogiljaldo
Copy link
Author

Hey @sarkarshuvojit I created ThreeDotsLabs/watermill-kafka#29 to try and implement a few different consumption methods, feel free to have a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants