Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2226: Add RetryTopicConfigurationSupport #2227

Closed
wants to merge 11 commits into from
162 changes: 87 additions & 75 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an

==== Configuration

Since 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class instead of the regular `@EnableKafka` annotation, which it contains.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not good say like this: brings an impression that we must not use an @EnableKafka anymore.
And you even didn't introduce what is @EnableKafkaRetryTopic. The side effect with the @EnableKafka could be explained in the end of this paragraph. Probably in the simple "NOTE: When @EnableKafkaRetryTopic is used, the @EnableKafka can be omitted: the former is meta-annotated with the second."

This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

===== Using the `@RetryableTopic` annotation

To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
Expand Down Expand Up @@ -76,7 +81,7 @@ public void processMessage(MyPojo message) {
----
====

NOTE: If you don't specify a kafkaTemplate name a bean with name `retryTopicDefaultKafkaTemplate` will be looked up.
NOTE: If you don't specify a kafkaTemplate name a bean with name `defaultRetryTopicKafkaTemplate` will be looked up.
If no bean is found an exception is thrown.

===== Using `RetryTopicConfiguration` beans
Expand Down Expand Up @@ -151,6 +156,40 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.

[[retry-topic-global-settings]]
===== Configuring Global Settings and Features

Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
An example follows:

====
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
[source, java]
----

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
}
----
====

IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic` annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple `@EnableKafka` annotation instead.

==== Features

Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
Expand Down Expand Up @@ -315,24 +354,22 @@ NOTE: The default behavior is retrying on all exceptions and not traversing caus

Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
See <<default-eh>> for the default list of fatal exceptions.
You can add or remove exceptions to and from this list with:
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See <<retry-topic-global-settings>> for more information.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(MyFatalException.class);
ddtr.removeNotRetryableException(ConversionException.class);
return ddtr;

@Override
protected void manageNonBlockingRetriesFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}

----
====

NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
NOTE: To disable fatal exceptions' classification, just clear the provided list.


===== Include and Exclude Topics
Expand Down Expand Up @@ -419,19 +456,18 @@ This is to avoid creation of excessively large messages (due to the stack trace

See <<dlpr-headers>> for more information.

To reconfigure the framework to use different settings for these properties, replace the standard `DeadLetterPublishingRecovererFactory` bean by adding a `recovererCustomizer`:
To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See <<retry-topic-global-settings>> for more details.

====
[source, java]
----
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.appendOriginalHeaders(true);
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
return factory;
}
----
====
Expand All @@ -444,32 +480,21 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
The default policy is `FixedBackOff`, with nine retries and no delay between them.
Optionally, you can provide your own back off policy.
To configure blocking retries, override the `configureBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport` and add the exceptions you want to retry, along with the `BackOff` to be used.
The default `BackOff` is a `FixedBackOff` with no delay and 9 attempts.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
See <<retry-topic-global-settings>> for more information.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
.backOff(new FixedBackOff(3000, 5));
}

====
[source, java]
----
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
----
====

Expand All @@ -480,23 +505,16 @@ Here's an example with both configurations working together:
====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
.backOff(new FixedBackOff(50, 3));
}

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
}

----
Expand Down Expand Up @@ -590,9 +608,14 @@ More complex naming strategies can be accomplished by registering a bean that im
====
[source, java]
----
@Bean
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
----
====
Expand Down Expand Up @@ -811,21 +834,14 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>

IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.

If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:
If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a `@Configuration` class that extends `RetryTopicConfigurationSupport` as explained in <<retry-topic-global-settings>> and set `useLegacyFactoryConfigurer` to `true`, such as:

====
[source, java]
----

@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
return retryTopicConfigurer;
@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
return rtc -> rtc.useLegacyFactoryConfigurer(true);
}
----
====
Expand All @@ -840,14 +856,10 @@ For example, to change the logging level to WARN you might add:
====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
return configurer;
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(commonErrorHandler ->
((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN))
}
----
====
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.RetryTopicConfigurationSupport;

/**
* Enables the non-blocking topic-based delayed retries feature. To be used in
* {@link Configuration Configuration} classes as follows:
* <pre class="code">
*
* &#064;EnableKafkaRetryTopic
* &#064;Configuration
* public class AppConfig {
* }
*
* &#064;Component
* public class MyListener {
*
* &#064;RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(4000))
* &#064;KafkaListener(topics = "myTopic")
* public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
* logger.info("Message {} received in topic {} ", message, receivedTopic);
* }
*
* &#064;DltHandler
* public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
* logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
* }
* </pre>
*
* To configure the feature's components, extend the {@link RetryTopicConfigurationSupport}
* class and override the appropriate methods. Then import the subclass using the
* {@link Import @Import} annotation on a {@link Configuration @Configuration} class,
* such as:
*
* <pre class="code">
*
* &#064;Configuration
* &#064;EnableKafka
* public class AppConfig extends RetryTopicConfigurationSupport {
* &#064;Override
* protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
* blockingRetries
* .retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
* .backOff(new FixedBackOff(50, 3));
* }
*
* &#064;Override
* protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
* nonBlockingRetries
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
* }
* </pre>
*
* @author Tomaz Fernandes
* @since 2.9
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(RetryTopicConfigurationSupport.class)
@EnableKafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this has to be there. Or at least it has to be documented properly:
"For convenience the @EnableKafkaRetryTopic is marked also with the @EnableKafka since former cannot work without secondary"
(Or some other proper English language 😄 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Now I'm thinking that since Boot's auto configuration already provides an @EnableKafka by default, maybe we'd run into the same issue of duplicated beans if users used this @EnableKafkaRetryTopic annotation?

I think that, considering this is a temporary solution until we add this as default in 3.0, maybe it'd be simpler to stick with the @EnableRetryTopic annotation, and then either move it to @EnableKafka or simply kill it and import the Support class directly for 3.0. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no harm: the @EnableKafka does a decent work to not duplicate beans. See its target KafkaBootstrapConfiguration.
I'm not sure that retry topics should be a default feature.
Therefore keep that @EnableKafkaRetryTopic is the way to go from now on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if we're introducing a new annotation then, and probably 99% of the users will be using it in a Spring Boot application anyway, I'd stick with the simpler @EnableRetryTopic annotation that should be enough for them, and just add a note in the documentation that if users are not in a Spring Boot app they should also add @EnableKafka.

But no worries, I'll keep it as is and we can revise this down the road if needed.

Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a thought, of course we can look into this later, but what if for 3.0 we added this class to @EnableKafka, but also added a custom @Condition that would perform that annotation / bean lookup we had in the other PR, to only add this beans it if there's at least one RetryTopicConfiguration or @RetryableTopic present?

This way we'd keep the automatic feature enabling users currently have, while not encumbering other users with this if they don't use it.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's not how other @Enable... features work. So, my vote for "No": if you are going to use @RetryableTopic, then add respective @Enable....
The condition could be considered in Spring Boot which indeed has some opinion.
The framework by itself has to be as straightforward as possible.
In the end it is just a library with minimal opinion 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. As you said in another post, eventually it'll be a good thing to better integrate this with Spring Boot's auto configuration in the future. But since this approach won't be available for Spring Boot prior to 3.0, we don't really need to worry about it now.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still want to see some mentioning of this meta-annotation in the Javadocs of this EnableKafkaRetryTopic class.

public @interface EnableKafkaRetryTopic {
}