Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional API #78

Open
mr-swifter opened this issue Jun 29, 2023 · 1 comment
Open

Transactional API #78

mr-swifter opened this issue Jun 29, 2023 · 1 comment

Comments

@mr-swifter
Copy link
Collaborator

Transactional API is required to use Exactly Once Semantics provided by Kafka.

One of the ideas how to structure this:

  • Make new actor KafkaTransactionalProducer which supports regular KafkaProducer API and transactional API (like beginTransaction, commitTransaction, abortTransaction etc. Maybe it's simpler to make KafkaProducer as class and inherit transactional producer from it to avoid code duplication
    • KafkaTransactionalProducer should have send and sendOffset within a transaction
  • KafkaTransactionalProducer should take transactional.id as parameter.
  • KafkaTransactionalProducer should call rd_kafka_init_transactions(...) and make sure it's initialised and not fenced with others.
  • KafkaTransactionalProducer should handle retriable errors and tries to recover. Possible such errors also need to be delivered to optional callback (as notification)
@mr-swifter mr-swifter mentioned this issue Jun 29, 2023
11 tasks
@blindspotbounty
Copy link
Collaborator

We had a couple of discussions with @mr-swifter regarding potential transactional API.

The easiest thing would be to implement calls "as is". However, there are some logic that might be used by more or less everyone such as:

  1. Retry committing transaction if error is retriable rd_kafka_error_is_retriable(error) == 1
  2. Abort transaction if transaction require abort and return corresponding error if rd_kafka_error_txn_requires_abort(error) == 1

Omitting insignificant implementation details and making it more abstract, we were using the following code in send/commit and partially in abort transaction.

func commitTransaction(attempts: UInt, timeoutMs: Int = -1 /* wait until transaction.timeout.ms */) async -> Result<Void, KafkaError> {
        for _ in 0..<attempts {
            let error = await forBlockingFunc {
                rd_kafka_commit_transaction(self.kafkaHandle, Int32(timeoutMs))
            }

             /* check if transaction is completed successfully  */
             if error == nil { return .success(()) }

             /* destroy error in any case */
             defer { rd_kafka_error_destroy(error) } 

             /* check if transaction is retriable and retry */
             if rd_kafka_error_is_retriable(error) == 1 { continue }

             /* check if transaction need to be aborted */
             if rd_kafka_error_txn_requires_abort(error) == 1 {
                 // at this point we cannot retry, client code should begin transaction from scratch
                 let res = await abortTransaction()
                 return /* TransactionAbortedError/InconsistentStateError from `res` */
             }

             let description = String(cString: rd_kafka_error_string(error))
             let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal error require producer restart
             return /* Error + isFatal + description */
        }
        return /* out of attempts error */
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants