Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listen and propagate RD_KAFKA_RESP_ERR__PARTITION_EOF #124

Open
blindspotbounty opened this issue Aug 28, 2023 · 7 comments
Open

Listen and propagate RD_KAFKA_RESP_ERR__PARTITION_EOF #124

blindspotbounty opened this issue Aug 28, 2023 · 7 comments

Comments

@blindspotbounty
Copy link
Collaborator

Sometimes it is nice to know that partition/topic was read to EOF and it is supported by librdkafka.
It should be explicitly enabled with property enable.partition.eof=true and error is handled, e.g.:

        for _ in 0..<maxEvents {
            let event = rd_kafka_queue_poll(self.queue, 0)
            defer { rd_kafka_event_destroy(event) }

            let rdEventType = rd_kafka_event_type(event)
            guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
                fatalError("Unsupported event type: \(rdEventType)")
            }

            switch eventType {
            case .error:
                let err = rd_kafka_event_error(event)
                if err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
                    let topicPartition = rd_kafka_event_topic_partition(event)
                    if let topicPartition {
                    ... return events

Probably, it could be extended with current api e.g.:

public struct KafkaConsumerMessage {
    /// The topic that the message was received from.
    public var topic: String
    /// The partition that the message was received from.
    public var partition: KafkaPartition
    /// The key of the message.
    public var key: ByteBuffer?
    /// The body of the message.
    public var value: ByteBuffer
    /// The offset of the message in its partition.
    public var offset: KafkaOffset
    var eof: Bool {
        self.value.readableBytesView.isEmpty
    }

    /// Initialize ``KafkaConsumerMessage`` as EOF from `rd_kafka_topic_partition_t` pointer.
    /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
    internal init(topicPartitionPointer: UnsafePointer<rd_kafka_topic_partition_t>) {
        let topicPartition = topicPartitionPointer.pointee
        guard let topic = String(validatingUTF8: topicPartition.topic) else {
            fatalError("Received topic name that is non-valid UTF-8")
        }
        self.topic = topic
        self.partition = KafkaPartition(rawValue: Int(topicPartition.partition))
        self.offset = KafkaOffset(rawValue: Int(topicPartition.offset))
        self.value = ByteBuffer()
    }

or changed to enum:

enum KafkaConsumerMessage {
    case message(topic: String, partition: KafkaPartition, key: ByteBuffer?, value: ByteBuffer, offset: KafkaOffset)
    case eof(topic: String, partition: KafkaPartition, offset: KafkaOffset)
}
@FranzBusch
Copy link
Contributor

@felixschlegel Can you check this?

@blindspotbounty
Copy link
Collaborator Author

Hi @FranzBusch and @felixschlegel!

Could you advise how can I help you to move it forward, please?

@felixschlegel
Copy link
Contributor

IMO we should make the KafkaConsumerMessages AsyncSequence throw when we encounter RD_KAFKA_RESP_ERR__PARTITION_EOF since this is an option that is explicitly set by the user.

Alternatively, we could emit this as an event in the KafkaConsumerEvents AsyncSequence though I am more inclined towards just throwing in KafkaConsumerMessages.

On that note: for this feature, we would also have to expose an option like isPartitionEOFEnabled to the KafkaConsumerConfiguration.

@FranzBusch
Copy link
Contributor

Can we ever recover from a partition EOF error? Like does this happen during rebalance or is this really a terminal state

@felixschlegel
Copy link
Contributor

Can we ever recover from a partition EOF error? Like does this happen during rebalance or is this really a terminal state

Not a terminal state afaik, just means that there are no more messages to read

@FranzBusch
Copy link
Contributor

Ah right, it is just when we hit the end of the partition and then we can still continue once more messages have been produced. So finishing the sequence is not the correct thing. I think the only thing we can really do is have an enum on the sequence. If we produce the EOF event into a separate sequence then there might be reordering problems when consuming both sequences where the EOF happens before the last message.

Overall, I am wondering how the consumption pattern look like and what you do when you hit EOF. @blindspotbounty could you provide some examples?

@blindspotbounty
Copy link
Collaborator Author

Hi @FranzBusch, thank you for looking into this case!

That would be great to have such enum in KafkaConsumerMessage to determine that partition is read up to the end.
The major thing for us is to determine that all messages are read after adding new or old service recovered/started successfully. Hitting partition EOF for all partitions for us means that now we are up to date and process real time data for partitions.
Other less important but still nice to have use-case is related to offset commits: usually commit every message is too slow, so we commit them time to time (after some number of entries or time passed). However, when there is no more messages we wait for next one. Partition EOF solves this as we know that no more messages will be received within partition and thus offset can be committed.
Currently we solve main problem by parsing statistics on partitions from librdkafka. That adds extra overhead from librdkafka to construct json and from our side to parse it. Additionally, it is less reliable because statistics is provided time to time, so with large message flow it can be hard to catch statistics with zero lag.

Looking forward to help/provide more details if needed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants