diff --git a/spring-kafka-docs/src/main/asciidoc/index.adoc b/spring-kafka-docs/src/main/asciidoc/index.adoc index 8da473fd10..6226ded78c 100644 --- a/spring-kafka-docs/src/main/asciidoc/index.adoc +++ b/spring-kafka-docs/src/main/asciidoc/index.adoc @@ -47,12 +47,12 @@ The <> covers the core classes to develop a Kafka applicatio include::kafka.adoc[] +include::retrytopic.adoc[] + include::streams.adoc[] include::testing.adoc[] -include::retrytopic.adoc[] - [[tips-n-tricks]] == Tips, Tricks and Examples diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index e8ca7b3d58..d53c0e8b86 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -5,6 +5,10 @@ IMPORTANT: This is an experimental feature and the usual rule of no breaking API Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions. This is regarding the API only; the feature is considered to be complete, and robust. +Version 2.9 changed the mechanism to bootstrap infrastructure beans; see <> for the two mechanisms that are now required to bootstrap the feature. + +After these changes, we are intending to remove the experimental designation, probably in version 3.0. + Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners. Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping. @@ -33,28 +37,27 @@ If one message's processing takes longer than the next message's back off period Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution. The precision can also be affected if the retry topic's consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments. -That being said, for consumers handling a single partition the message's processing should happen under 100ms after it's exact due time for most situations. +That being said, for consumers handling a single partition the message's processing should occur approximately at its exact due time for most situations. IMPORTANT: It is guaranteed that a message will never be processed before its due time. ===== Tuning the Delay Precision -The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`. -Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms. -These values will only be set if the property has its default values - if you change either value yourself your change will not be overridden. -This way you can tune the precision and performance for the retry topics if you need to. - -NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics. +Starting with version 2.9, it is no longer necessary to tune the precision because a task scheduler is used to resume the partition and wake the consumer, if necessary. +[[retry-config]] ==== Configuration -Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class. +Starting with version 2.9, for default configuration, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class. This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime. -Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden. -For more details refer to <>. NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`. +Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden. +For more details refer to <>. + +IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`. + ===== Using the `@RetryableTopic` annotation To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations. @@ -161,9 +164,9 @@ It's best to use a single `RetryTopicConfiguration` bean for configuration of su [[retry-topic-global-settings]] ===== Configuring Global Settings and Features -Since 2.9, the previous bean overriding approach for configuring components has been deprecated. -This does not change the `RetryTopicConfiguration` beans approach - only components' configurations. -Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden. +Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API). +This does not change the `RetryTopicConfiguration` beans approach - only infrastructure components' configurations. +Now the `RetryTopicConfigurationSupport` class should be extended in a (single) `@Configuration` class, and the proper methods overridden. An example follows: ==== @@ -185,6 +188,15 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport { protected void manageNonBlockingFatalExceptions(List> nonBlockingFatalExceptions) { nonBlockingFatalExceptions.add(MyNonBlockingException.class); } + + @Override + protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) { + // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause + customizersConfigurer.customizeErrorHandler(eh -> { + ((DefaultErrorHandler) eh).setSeekAfterError(false); + }); + } + } ---- ==== @@ -629,7 +641,7 @@ As an example the following implementation, in addition to the standard suffix, ---- public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory { - @Override + @Override public RetryTopicNamesProvider createRetryTopicNamesProvider( DestinationTopic.Properties properties) { @@ -728,7 +740,7 @@ In the latter the consumer ends the execution without forwarding the message. ---- @RetryableTopic(dltProcessingFailureStrategy = - DltStrategy.FAIL_ON_ERROR) + DltStrategy.FAIL_ON_ERROR) @KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing @@ -777,7 +789,7 @@ In this case after retrials are exhausted the processing simply ends. ---- @RetryableTopic(dltProcessingFailureStrategy = - DltStrategy.NO_DLT) + DltStrategy.NO_DLT) @KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing