Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated messages when default rebalance happening with 2+ consumers and offset auto commit disabled #136

Open
blindspotbounty opened this issue Oct 3, 2023 · 2 comments

Comments

@blindspotbounty
Copy link
Collaborator

Currently, swift-kafka-client does not support rebalance callback/event. However, that is pretty important thing to avoid duplicated messages from the broker.
The typical scenario for a lot of applications is to receive a message, process it and then commit offset:

for try await record in consumer1.messages {
    // process record
    try await consumer1.commit(record)
}

or make it time to time + at the end of lifetime when bulk message processing is required:

for try await record in consumer1.messages {
    // process record
    if ctr % 10 == 0 {
        try await consumer1.commit(record)
    }
}

Usually, to process messages in parallel, topics are divided to partitions that are served by consumers independently.
That lead to requirement for re-assignment of those partitions when such consumer is being added or removed to distribute partitions evenly.
librdkafka has default strategy to assign or remove partitions automatically, however it also seeks for latest committed offset from broker.
With automatic offset commit disabled, it leads to a race, specifically in two places:

  1. Between receiving record from messages stream until commit
  2. Between polling message and receive it from messages stream

In other words, scenario is the following:

  1. poll some message from librdkafka
  2. rebalance happening and new partitions assigned
  3. but message enqueued to async stream and can be processed
  4. while message will be received once again because its offset was not committed to kafka

Though, partially probability of such race could be lowered in the library itself, it cannot be eliminated without application that may still process messages. Furthermore, it will break Kafka EOS for transactions in future.
Therefore, it would be nice if library allows downstream application to deal with rebalance in one of the following ways (or any other):

  1. Allow to define/listen to rebalance within the same sequence as messages, so all messages come to application and then application receive rebalance event
  2. Or purge/flush messages sequence on rebalance and then deliver rebalance event any other way (through events sequence or through callback closure)

To demonstrate possible impact, I've used the following example in branch (https://github.com/ordo-one/swift-kafka-client/tree/test-rebalance-lead-to-messages-resending, to run: swift run -c release Snapshot based on example for previous cases).
This sample as a first step produces 15_000_000 messages within 6 partitions.
Then it starts 2 consumers within the same consumer group and delay of 20 seconds between starts.

That code has two problems related to rebalance:

  1. This code may try to commit/store offsets for partitions that are not assigned for the first consumer anymore (one of possible outcomes) - that may happen if commit every record or commit them time to time - most reproducible scenario - just crash with exception:
Swift/ErrorType.swift:200: Fatal error: Error raised at top level: KafkaError.rdKafkaError: Local: Erroneous state Kafka/RDKafkaClient.swift:532
  1. Read more messages than expected, i.e. more than 15_000_000 - less reproducible but still bad (+~100k dups):
[Consumer1] Messages received: 15101688
[Consumer1] Messages received: 15101689
[Consumer1] Messages received: 15101690
[Consumer1] Messages received: 15101691
[Consumer1] Messages received: 15101692
[Consumer1] Messages received: 15101693
[Consumer1] Messages received: 15101694
@blindspotbounty
Copy link
Collaborator Author

blindspotbounty commented Dec 8, 2023

After change to rd_kafka_consumer_poll, I've found a basic solution that should eliminate that problem and solve it for further synchronisation.
My thought is to get rid of message async sequence but rather wrap rd_kafka_consumer_poll into iterator.

In other words, for basic use-case, it will be:

public struct KafkaConsumerMessages: Sendable, AsyncSequence {
...
    public struct AsyncIterator: AsyncIteratorProtocol {
        public func next() async throws -> Element? {
                let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }
                
                switch action {
                case .poll(let client):
                    if let message = try client.consumerPoll() { // non-blocking call
                        return message
                    }
                    // + e.g. DispatchQueue + blocking call
                case .suspend:
                    try await Task.sleep(for: pollInterval)
                case .terminate:
                    return nil

I believe it would allow to solve several problems:

  1. Mostly eliminate the problem of duplicate messages (it still will be possible to get 1 message duplicate if no custom rebalance callback in consuming code)
  2. Explicitly storing offsets
  3. In future rebalance event/callback will be synchronized, i.e. always delivered after all messages -> allow client application to synchronize work
  4. Misc: less allocations/arc/complexity/backpressure by librdkafa

@FranzBusch, @felixschlegel what do you think?

@blindspotbounty
Copy link
Collaborator Author

I've put suggestion above into PR with test that reproduces the issue #158.

@blindspotbounty blindspotbounty changed the title Possibly duplicated messages when default rebalance happening and offset auto commit disabled Duplicated messages when default rebalance happening with 2+ consumers and offset auto commit disabled Dec 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant