Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2226: Add RetryTopicConfigurationSupport #2227

Closed
wants to merge 11 commits into from
25 changes: 11 additions & 14 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an

==== Configuration

IMPORTANT: Since 2.9, the `@EnableRetryTopic` annotation should be used in a `@Configuration` annotated class.
Since 2.9, the `EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
Also, to configure the components and global features, the `RetryTopicConfigurationSupport` class should be extended and imported in a `@Configuration` class.
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

===== Using the `@RetryableTopic` annotation
Expand Down Expand Up @@ -159,20 +159,17 @@ It's best to use a single `RetryTopicConfiguration` bean for configuration of su
[[retry-topic-global-settings]]
===== Configuring Global Settings and Features

Since 2.9, the previous bean-based configuration approach has been deprecated.
Instead, the `RetryTopicConfigurationSupport` class should be extended, the proper methods overridden, and imported in a `@Configuration` annotated class.
Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
An example follows:

====
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
[source, java]
----

@EnableKafka
@Import(MyRetryTopicConfiguration.class)
@Configuration
public class KafkaConfiguration {
}

public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved

@Override
Expand All @@ -190,7 +187,8 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
----
====

NOTE: When using this approach, the `@EnableRetryTopic` annotation is not necessary.
IMPORTANT: When using this configuration approach, the `EnableKafkaRetryTopic` annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple `@EnableKafka` annotation instead.

==== Features

Expand Down Expand Up @@ -356,7 +354,7 @@ NOTE: The default behavior is retrying on all exceptions and not traversing caus

Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
See <<default-eh>> for the default list of fatal exceptions.
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a class that extends `RetryTopicConfigurationSupport`.
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See <<retry-topic-global-settings>> for more information.

====
Expand Down Expand Up @@ -458,7 +456,7 @@ This is to avoid creation of excessively large messages (due to the stack trace

See <<dlpr-headers>> for more information.

To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a class that extends `RetryTopicConfigurationSupport`.
To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See <<retry-topic-global-settings>> for more details.

====
Expand All @@ -482,8 +480,7 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries, override the `configureBlockingRetries` method in a class that extends `RetryTopicConfigurationSupport` and add the exceptions you want to retry, along with the `BackOff` to be used.
Then `@Import` this subclass in a `@Configuration` class.
To configure blocking retries, override the `configureBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport` and add the exceptions you want to retry, along with the `BackOff` to be used.
The default `BackOff` is a `FixedBackOff` with no delay and 9 attempts.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
See <<retry-topic-global-settings>> for more information.

Expand Down Expand Up @@ -837,7 +834,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>

IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.

If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a class that extends `RetryTopicConfigurationSupport` as explained in <<retry-topic-global-settings>> and set `useLegacyFactoryConfigurer` to `true`, such as:
If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a `@Configuration` class that extends `RetryTopicConfigurationSupport` as explained in <<retry-topic-global-settings>> and set `useLegacyFactoryConfigurer` to `true`, such as:

====
[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* {@link Configuration Configuration} classes as follows:
* <pre class="code">
*
* &#064;EnableRetryTopic
* &#064;EnableKafkaRetryTopic
* &#064;Configuration
* public class AppConfig {
* }
Expand Down Expand Up @@ -60,12 +60,7 @@
*
* &#064;Configuration
* &#064;EnableKafka
* &#064;Import(MyRetryTopicConfigurationSupport.class)
* public class AppConfig {
* }
*
* public static class MyRetryTopicConfigurationSupport extends RetryTopicConfigurationSupport {
*
* public class AppConfig extends RetryTopicConfigurationSupport {
* &#064;Override
* protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
* blockingRetries
Expand All @@ -77,7 +72,7 @@
* protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
* nonBlockingRetries
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
* } *
* }
* </pre>
*
* @author Tomaz Fernandes
Expand All @@ -87,5 +82,6 @@
@Target(ElementType.TYPE)
@Documented
@Import(RetryTopicConfigurationSupport.class)
public @interface EnableRetryTopic {
@EnableKafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this has to be there. Or at least it has to be documented properly:
"For convenience the @EnableKafkaRetryTopic is marked also with the @EnableKafka since former cannot work without secondary"
(Or some other proper English language 😄 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Now I'm thinking that since Boot's auto configuration already provides an @EnableKafka by default, maybe we'd run into the same issue of duplicated beans if users used this @EnableKafkaRetryTopic annotation?

I think that, considering this is a temporary solution until we add this as default in 3.0, maybe it'd be simpler to stick with the @EnableRetryTopic annotation, and then either move it to @EnableKafka or simply kill it and import the Support class directly for 3.0. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no harm: the @EnableKafka does a decent work to not duplicate beans. See its target KafkaBootstrapConfiguration.
I'm not sure that retry topics should be a default feature.
Therefore keep that @EnableKafkaRetryTopic is the way to go from now on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if we're introducing a new annotation then, and probably 99% of the users will be using it in a Spring Boot application anyway, I'd stick with the simpler @EnableRetryTopic annotation that should be enough for them, and just add a note in the documentation that if users are not in a Spring Boot app they should also add @EnableKafka.

But no worries, I'll keep it as is and we can revise this down the road if needed.

Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a thought, of course we can look into this later, but what if for 3.0 we added this class to @EnableKafka, but also added a custom @Condition that would perform that annotation / bean lookup we had in the other PR, to only add this beans it if there's at least one RetryTopicConfiguration or @RetryableTopic present?

This way we'd keep the automatic feature enabling users currently have, while not encumbering other users with this if they don't use it.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's not how other @Enable... features work. So, my vote for "No": if you are going to use @RetryableTopic, then add respective @Enable....
The condition could be considered in Spring Boot which indeed has some opinion.
The framework by itself has to be as straightforward as possible.
In the end it is just a library with minimal opinion 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. As you said in another post, eventually it'll be a good thing to better integrate this with Spring Boot's auto configuration in the future. But since this approach won't be available for Spring Boot prior to 3.0, we don't really need to worry about it now.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still want to see some mentioning of this meta-annotation in the Javadocs of this EnableKafkaRetryTopic class.

public @interface EnableKafkaRetryTopic {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import java.time.Clock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
Expand All @@ -41,7 +40,6 @@
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Provide the component instances that will be used with
Expand Down Expand Up @@ -77,9 +75,10 @@ public class RetryTopicComponentFactory {
* @return the instance.
*/
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
ListenerContainerFactoryResolver factoryResolver,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
ListenerContainerFactoryResolver factoryResolver,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {

return new RetryTopicConfigurer(destinationTopicProcessor, factoryResolver,
listenerContainerFactoryConfigurer, retryTopicNamesProviderFactory);
}
Expand Down Expand Up @@ -116,6 +115,7 @@ public DestinationTopicResolver destinationTopicResolver() {
*/
public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory(
DestinationTopicResolver destinationTopicResolver) {

return new DeadLetterPublishingRecovererFactory(destinationTopicResolver);
}

Expand All @@ -142,8 +142,8 @@ public ListenerContainerFactoryResolver listenerContainerFactoryResolver(BeanFac
* @return the instance.
*/
public ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
Clock clock) {
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
Clock clock) {

tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
return new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
}
Expand All @@ -162,19 +162,15 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
* {@link KafkaConsumerBackoffManager} instance used to back off the partitions.
* @param registry the {@link ListenerContainerRegistry} used to fetch the
* {@link MessageListenerContainer}.
* @param context the context.
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
return new PartitionPausingBackOffManagerFactory(registry);
}
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
ApplicationContext context) {

/**
* Create the {@link TaskExecutor} that will be used in the
* {@link KafkaConsumerTimingAdjuster}.
* @return the task executor.
*/
public TaskExecutor taskExecutor() {
return new ThreadPoolTaskExecutor();
PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(registry);
factory.setApplicationContext(context);
return factory;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.annotation.EnableRetryTopic;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
Expand All @@ -57,20 +57,19 @@
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* This is the main class providing the configuration behind the non-blocking,
* topic-based delayed retries feature. It is typically imported by adding
* {@link EnableRetryTopic @EnableRetryTopic} to an application
* {@link EnableKafkaRetryTopic @EnableRetryTopic} to an application
* {@link Configuration @Configuration} class. An alternative more advanced option
* is to extend directly from this class and override methods as necessary, remembering
* to add {@link Configuration @Configuration} to the subclass and {@link Bean @Bean}
* to overridden {@link Bean @Bean} methods. For more details see the javadoc of
* {@link EnableRetryTopic @EnableRetryTopic}.
* {@link EnableKafkaRetryTopic @EnableRetryTopic}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some typo. Or missed renaming...

*
* @author Tomaz Fernandes
* @since 2.9
Expand All @@ -94,10 +93,10 @@ public class RetryTopicConfigurationSupport {
*/
@Bean(name = RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME)
public RetryTopicConfigurer retryTopicConfigurer(@Qualifier(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
@Qualifier(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
DestinationTopicResolver destinationTopicResolver,
BeanFactory beanFactory) {
KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
@Qualifier(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
DestinationTopicResolver destinationTopicResolver,
BeanFactory beanFactory) {

DestinationTopicProcessor destinationTopicProcessor = this.componentFactory
.destinationTopicProcessor(destinationTopicResolver);
Expand Down Expand Up @@ -141,6 +140,7 @@ protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
*/
private void processDeadLetterPublishingContainerFactory(
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {

CustomizersConfigurer customizersConfigurer = new CustomizersConfigurer();
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
configureCustomizers(customizersConfigurer);
JavaUtils.INSTANCE
Expand Down Expand Up @@ -273,61 +273,38 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
* and return a different {@link KafkaBackOffManagerFactory}.
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
* {@link MessageListenerContainer} at runtime to be backed off.
* @param taskExecutor the {@link TaskExecutor} to be used with the
* {@link KafkaConsumerTimingAdjuster}.
* @param context the context.
* @return the instance.
*/
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry,
@Qualifier(BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME) TaskExecutor taskExecutor) {
ListenerContainerRegistry registry, ApplicationContext context) {

KafkaBackOffManagerFactory backOffManagerFactory =
this.componentFactory.kafkaBackOffManagerFactory(registry);
this.componentFactory.kafkaBackOffManagerFactory(registry, context);
JavaUtils.INSTANCE.acceptIfInstanceOf(PartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
factory -> configurePartitionPausingFactory(taskExecutor, factory));
this::configurePartitionPausingFactory);
return backOffManagerFactory.create();
}

/**
* Internal method for processing the {@link PartitionPausingBackOffManagerFactory}.
* @param taskExecutor the {@link TaskExecutor} instance to be used with
* {@link WakingKafkaConsumerTimingAdjuster}. Consider overriding the
* {@link #configureKafkaBackOffManager} method for furher customization.
* @param factory the factory instance.
*/
private void configurePartitionPausingFactory(TaskExecutor taskExecutor,
PartitionPausingBackOffManagerFactory factory) {
private void configurePartitionPausingFactory(PartitionPausingBackOffManagerFactory factory) {

KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer();
configureKafkaBackOffManager(configurer);
Assert.isTrue(!configurer.timingAdjustmentEnabled
|| configurer.maxThreadPoolSize == null
|| ThreadPoolTaskExecutor.class.isAssignableFrom(taskExecutor.getClass()),
() -> "TaskExecutor must be an instance of ThreadPoolTaskExecutor to set maxThreadPoolSize");
factory.setTimingAdjustmentEnabled(configurer.timingAdjustmentEnabled);
JavaUtils.INSTANCE
.acceptIfNotNull(configurer.maxThreadPoolSize, poolSize -> ((ThreadPoolTaskExecutor) taskExecutor)
.setMaxPoolSize(poolSize))
.acceptIfCondition(configurer.timingAdjustmentEnabled, taskExecutor, factory::setTaskExecutor)
.acceptIfNotNull(configurer.maxThreadPoolSize, factory::setMaxThreadPoolSize)
.acceptIfNotNull(configurer.taskExecutor, factory::setTaskExecutor)
.acceptIfNotNull(configurer.clock, factory::setClock);
}

/**
* Create the {@link TaskExecutor} instance that will be used with the
* {@link WakingKafkaConsumerTimingAdjuster}, if timing adjustment is enabled.
* @return the instance.
*/
@Bean(name = BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME)
public TaskExecutor backoffManagerTaskExecutor() {
KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer();
configureKafkaBackOffManager(configurer);
return configurer.timingAdjustmentEnabled
? this.componentFactory.taskExecutor()
: task -> {
};
}

/**
* Override this method to configure the {@link KafkaConsumerBackoffManager}.
* @param backOffManagerConfigurer a {@link KafkaBackOffManagerConfigurer}.
Expand Down Expand Up @@ -395,10 +372,12 @@ public static class KafkaBackOffManagerConfigurer {

boolean timingAdjustmentEnabled = true;

private Integer maxThreadPoolSize = null;
private Integer maxThreadPoolSize;

private Clock clock;

private TaskExecutor taskExecutor;

/**
* Disable timing adjustment for the delays. By choosing this option records
* won't be processed exactly at the proper time. It's guaranteed however that
Expand Down Expand Up @@ -426,6 +405,17 @@ public KafkaBackOffManagerConfigurer setMaxThreadPoolSize(int maxThreadPoolSize)
return this;
}

/**
* Provide an {@link TaskExecutor} instance to be used with the
* {@link WakingKafkaConsumerTimingAdjuster}.
* @param taskExecutor the task executor instance.
* @return the configurer.
*/
public KafkaBackOffManagerConfigurer setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
return this;
}

/**
* Set the {@link Clock} instance to be used with the
* {@link KafkaConsumerBackoffManager}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -86,6 +88,10 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
return this.applicationContext.getBean(beanName, beanClass);
}

protected void addApplicationListener(ApplicationListener<?> applicationListener) {
((ConfigurableApplicationContext) this.applicationContext).addApplicationListener(applicationListener);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
Expand Down