Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TopicConfiguration default values should not be set to librdkafka config #106

Open
blindspotbounty opened this issue Jul 31, 2023 · 1 comment

Comments

@blindspotbounty
Copy link
Collaborator

It is not something obvious in librdkafka but if we set topic config to default values, config become modified.
That may cause unexpected behaviour.

For example, I use the following config (dump):

Config for topic is 
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000

After the code in RDKafkaTopicConfig:

        try topicConfig.dictionary.forEach { key, value in
            try Self.set(configPointer: configPointer, key: key, value: value)
        }

it is not changed:

After Config for topic is 
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000

But since these values are modified, I have the following error:

error: -[IntegrationTests.SwiftKafkaTests testProduceAndConsumeWithTransaction] : failed: caught error: "KafkaError.rdKafkaError: Local: Invalid argument or configuration SwiftKafka/RDKafkaTopicHandles.swift:76

For experiment, I've commented the code above and everything works.

Just some more information. The reason for that is hidden in how librdkafka works. It has some 'modified' flags inside that changes even if we set values to default, while remain unchanged when config is untouched:

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
        rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
        rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
        rd_kafka_anyconf_clear_all_is_modified(tconf);
        return tconf;
}

const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
                                         const rd_kafka_conf_t *conf,
                                         rd_kafka_topic_conf_t *tconf) {
...
if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
...
if (rd_kafka_topic_conf_is_modified(tconf,
                                                    "queuing.strategy")) {
...
                if (conf->eos.transactional_id) {
                        if (!rd_kafka_topic_conf_is_modified(
                                tconf, "message.timeout.ms"))
...

Furthermore, topic config supports nullptr (aka swift nil) value that creates default topic configuration.

I see several options here for gsoc interface (though, might be more):

  1. Add possibility to provide nil topic configuration -> convert to null pointer in librdkafka
  2. Make values optional in topic configuration and set only changed values
  3. Check values in topic configuration: if they equal to provided -> don't set them
@felixschlegel
Copy link
Contributor

[For the record]

The issue here is not that swift-kafka-client is setting the modified flag on the librdkafka configuration properties.
The problem here is that in the following code snippet:

(Source)

                if (conf->eos.transactional_id) {
                        if (!rd_kafka_topic_conf_is_modified(
                                tconf, "message.timeout.ms"))
                                tconf->message_timeout_ms =
                                    conf->eos.transaction_timeout_ms;
                        else if (tconf->message_timeout_ms >
                                 conf->eos.transaction_timeout_ms)
                                 return "`message.timeout.ms` must be set <= "
                                       "`transaction.timeout.ms`";
                }

We run into the

else if (tconf->message_timeout_ms >
                                 conf->eos.transaction_timeout_ms)

Condition, which is not satisfied by the default configuration values that we / librdkafka set.


As the documentation for the message.timeout.ms property states:

Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. The message timeout is automatically adjusted to transaction.timeout.ms if transactional.id is configured.

We should make sure that we keep these values in-sync ourselves once the Transactional Producer is implemented alongside the transaction.timeout.ms property.

This would take the shape of:

                        if (!rd_kafka_topic_conf_is_modified(
                                tconf, "message.timeout.ms"))
                                tconf->message_timeout_ms =
                                    conf->eos.transaction_timeout_ms;

(Automatically adjusting message.timeout.ms to be equal to transaction.timeout.ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants