You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, swift-kafka-client does not support rebalance callback/event. However, that is pretty important thing to avoid duplicated messages from the broker.
The typical scenario for a lot of applications is to receive a message, process it and then commit offset:
for try await record in consumer1.messages {
// process record
try await consumer1.commit(record)
}
or make it time to time + at the end of lifetime when bulk message processing is required:
for try await record in consumer1.messages {
// process record
if ctr % 10 == 0 {
try await consumer1.commit(record)
}
}
Usually, to process messages in parallel, topics are divided to partitions that are served by consumers independently.
That lead to requirement for re-assignment of those partitions when such consumer is being added or removed to distribute partitions evenly.
librdkafka has default strategy to assign or remove partitions automatically, however it also seeks for latest committed offset from broker.
With automatic offset commit disabled, it leads to a race, specifically in two places:
Between receiving record from messages stream until commit
Between polling message and receive it from messages stream
In other words, scenario is the following:
poll some message from librdkafka
rebalance happening and new partitions assigned
but message enqueued to async stream and can be processed
while message will be received once again because its offset was not committed to kafka
Though, partially probability of such race could be lowered in the library itself, it cannot be eliminated without application that may still process messages. Furthermore, it will break Kafka EOS for transactions in future.
Therefore, it would be nice if library allows downstream application to deal with rebalance in one of the following ways (or any other):
Allow to define/listen to rebalance within the same sequence as messages, so all messages come to application and then application receive rebalance event
Or purge/flush messages sequence on rebalance and then deliver rebalance event any other way (through events sequence or through callback closure)
To demonstrate possible impact, I've used the following example in branch (https://github.com/ordo-one/swift-kafka-client/tree/test-rebalance-lead-to-messages-resending, to run: swift run -c release Snapshot based on example for previous cases).
This sample as a first step produces 15_000_000 messages within 6 partitions.
Then it starts 2 consumers within the same consumer group and delay of 20 seconds between starts.
That code has two problems related to rebalance:
This code may try to commit/store offsets for partitions that are not assigned for the first consumer anymore (one of possible outcomes) - that may happen if commit every record or commit them time to time - most reproducible scenario - just crash with exception:
Swift/ErrorType.swift:200: Fatal error: Error raised at top level: KafkaError.rdKafkaError: Local: Erroneous state Kafka/RDKafkaClient.swift:532
Read more messages than expected, i.e. more than 15_000_000 - less reproducible but still bad (+~100k dups):
After change to rd_kafka_consumer_poll, I've found a basic solution that should eliminate that problem and solve it for further synchronisation.
My thought is to get rid of message async sequence but rather wrap rd_kafka_consumer_poll into iterator.
In other words, for basic use-case, it will be:
publicstruct KafkaConsumerMessages:Sendable,AsyncSequence{...publicstruct AsyncIterator:AsyncIteratorProtocol{publicfunc next()asyncthrows->Element?{letaction=self.stateMachineHolder.stateMachine.withLockedValue{ $0.nextConsumerPollLoopAction()}
switch action {case.poll(let client):
if let message =try client.consumerPoll(){// non-blocking callreturn message
}// + e.g. DispatchQueue + blocking callcase.suspend:tryawaitTask.sleep(for: pollInterval)case.terminate:returnnil
I believe it would allow to solve several problems:
Mostly eliminate the problem of duplicate messages (it still will be possible to get 1 message duplicate if no custom rebalance callback in consuming code)
Explicitly storing offsets
In future rebalance event/callback will be synchronized, i.e. always delivered after all messages -> allow client application to synchronize work
Misc: less allocations/arc/complexity/backpressure by librdkafa
I've put suggestion above into PR with test that reproduces the issue #158.
blindspotbounty
changed the title
Possibly duplicated messages when default rebalance happening and offset auto commit disabled
Duplicated messages when default rebalance happening with 2+ consumers and offset auto commit disabled
Dec 12, 2023
Currently, swift-kafka-client does not support rebalance callback/event. However, that is pretty important thing to avoid duplicated messages from the broker.
The typical scenario for a lot of applications is to receive a message, process it and then commit offset:
or make it time to time + at the end of lifetime when bulk message processing is required:
Usually, to process messages in parallel, topics are divided to partitions that are served by consumers independently.
That lead to requirement for re-assignment of those partitions when such consumer is being added or removed to distribute partitions evenly.
librdkafka has default strategy to assign or remove partitions automatically, however it also seeks for latest committed offset from broker.
With automatic offset commit disabled, it leads to a race, specifically in two places:
messages
stream until commitmessages
streamIn other words, scenario is the following:
Though, partially probability of such race could be lowered in the library itself, it cannot be eliminated without application that may still process messages. Furthermore, it will break Kafka EOS for transactions in future.
Therefore, it would be nice if library allows downstream application to deal with rebalance in one of the following ways (or any other):
To demonstrate possible impact, I've used the following example in branch (https://github.com/ordo-one/swift-kafka-client/tree/test-rebalance-lead-to-messages-resending, to run:
swift run -c release Snapshot
based on example for previous cases).This sample as a first step produces 15_000_000 messages within 6 partitions.
Then it starts 2 consumers within the same consumer group and delay of 20 seconds between starts.
That code has two problems related to rebalance:
The text was updated successfully, but these errors were encountered: