Skip to content

Commit

Permalink
Docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jul 6, 2022
1 parent 6a3b1ae commit 38cb445
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
4 changes: 2 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/index.adoc
Expand Up @@ -47,12 +47,12 @@ The <<kafka,main chapter>> covers the core classes to develop a Kafka applicatio

include::kafka.adoc[]

include::retrytopic.adoc[]

include::streams.adoc[]

include::testing.adoc[]

include::retrytopic.adoc[]

[[tips-n-tricks]]
== Tips, Tricks and Examples

Expand Down
44 changes: 28 additions & 16 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Expand Up @@ -5,6 +5,10 @@ IMPORTANT: This is an experimental feature and the usual rule of no breaking API
Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions.
This is regarding the API only; the feature is considered to be complete, and robust.

Version 2.9 changed the mechanism to bootstrap infrastructure beans; see <<retry-config>> for the two mechanisms that are now required to bootstrap the feature.

After these changes, we are intending to remove the experimental designation, probably in version 3.0.

Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.

Expand Down Expand Up @@ -33,28 +37,27 @@ If one message's processing takes longer than the next message's back off period
Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution.
The precision can also be affected if the retry topic's consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments.

That being said, for consumers handling a single partition the message's processing should happen under 100ms after it's exact due time for most situations.
That being said, for consumers handling a single partition the message's processing should occur approximately at its exact due time for most situations.

IMPORTANT: It is guaranteed that a message will never be processed before its due time.

===== Tuning the Delay Precision

The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
These values will only be set if the property has its default values - if you change either value yourself your change will not be overridden.
This way you can tune the precision and performance for the retry topics if you need to.

NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.
Starting with version 2.9, it is no longer necessary to tune the precision because a task scheduler is used to resume the partition and wake the consumer, if necessary.

[[retry-config]]
==== Configuration

Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
Starting with version 2.9, for default configuration, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.

Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`.

===== Using the `@RetryableTopic` annotation

To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
Expand Down Expand Up @@ -161,9 +164,9 @@ It's best to use a single `RetryTopicConfiguration` bean for configuration of su
[[retry-topic-global-settings]]
===== Configuring Global Settings and Features

Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the `RetryTopicConfiguration` beans approach - only infrastructure components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a (single) `@Configuration` class, and the proper methods overridden.
An example follows:

====
Expand All @@ -185,6 +188,15 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
((DefaultErrorHandler) eh).setSeekAfterError(false);
});
}
}
----
====
Expand Down Expand Up @@ -629,7 +641,7 @@ As an example the following implementation, in addition to the standard suffix,
----
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
Expand Down Expand Up @@ -728,7 +740,7 @@ In the latter the consumer ends the execution without forwarding the message.
----
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
Expand Down Expand Up @@ -777,7 +789,7 @@ In this case after retrials are exhausted the processing simply ends.
----
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
Expand Down

0 comments on commit 38cb445

Please sign in to comment.