Skip to content

Commit

Permalink
GH-2239: Fix Boot AutoConfiguration
Browse files Browse the repository at this point in the history
See #2239

The previous commit removed the implicit bootstrap in favor of enforcing the
user to use the `@EnableKafkaRretryTopic` or explicitly extend
`RetryTopicConfigurationSupport`.

Unfortunately, this breaks Spring Boot because it can auto configure a
`RetryTopicConfiguration` bean, which means the infrastructure beans are required.

Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration`
bean is found in the application context.

Tested with a Boot app.

**cherry-pick to main**
  • Loading branch information
garyrussell authored and artembilan committed Jul 11, 2022
1 parent edfbd1d commit 2f50088
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 14 deletions.
Expand Up @@ -62,6 +62,7 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
Expand All @@ -83,11 +84,15 @@
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand All @@ -96,6 +101,8 @@
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -516,14 +523,50 @@ private RetryTopicConfigurer getRetryTopicConfigurer() {
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}
catch (NoSuchBeanDefinitionException ex) {
this.logger.error("A 'RetryTopicConfigurer' with name "
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
throw ex;
this.retryTopicConfigurer = createDefaultConfigurer();
}
}
return this.retryTopicConfigurer;
}

private RetryTopicConfigurer createDefaultConfigurer() {
if (this.applicationContext instanceof GenericApplicationContext) {
GenericApplicationContext gac = (GenericApplicationContext) this.applicationContext;
gac.registerBean(
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
RetryTopicConfigurationSupport.class,
() -> new RetryTopicConfigurationSupport());
RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
RetryTopicConfigurationSupport.class);
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
if (schedW == null && sched == null) {
RetryTopicSchedulerWrapper newSchedW = new RetryTopicSchedulerWrapper(new ThreadPoolTaskScheduler());
gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME,
RetryTopicSchedulerWrapper.class, () -> newSchedW);
schedW = gac.getBean(RetryTopicSchedulerWrapper.class);
}
KafkaConsumerBackoffManager bom =
rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(),
schedW, sched);
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);

gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
() -> destResolver);
gac.registerBean(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME,
KafkaConsumerBackoffManager.class, () -> bom);
gac.registerBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class,
() -> rtc);

return this.beanFactory
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}
throw new IllegalStateException("When there is no RetryTopicConfigurationSupport bean, the application context "
+ "must be a GenericApplicationContext");
}

private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
Expand Down
Expand Up @@ -87,7 +87,7 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
public final void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.listener;

import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;

/**
Expand All @@ -32,9 +33,13 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk
/**
* Construct an instance with the provided properties.
* @param listenerContainerRegistry the registry.
* @param applicationContext the application context.
*/
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry,
ApplicationContext applicationContext) {

super(listenerContainerRegistry);
setApplicationContext(applicationContext);
}

@Override
Expand Down
Expand Up @@ -19,13 +19,21 @@
/**
* The bean names for the non-blocking topic-based delayed retries feature.
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.9
*/
public final class RetryTopicBeanNames {

private RetryTopicBeanNames() {
}

/**
* The bean name of an internally managed retry topic configuration support, if
* needed.
*/
public static final String DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME =
"org.springframework.kafka.retrytopic.internalRetryTopicConfigurationSupport";

/**
* The bean name of the internally managed retry topic configurer.
*/
Expand All @@ -50,4 +58,10 @@ private RetryTopicBeanNames() {
public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";

/**
* The bean name of the internally registered scheduler wrapper, if needed.
*/
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";

}
Expand Up @@ -19,6 +19,7 @@
import java.time.Clock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
Expand Down Expand Up @@ -150,10 +151,13 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
* {@link KafkaConsumerBackoffManager} instance used to back off the partitions.
* @param registry the {@link ListenerContainerRegistry} used to fetch the
* {@link MessageListenerContainer}.
* @param applicationContext the application context.
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
return new ContainerPartitionPausingBackOffManagerFactory(registry);
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
ApplicationContext applicationContext) {

return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext);
}

/**
Expand Down
Expand Up @@ -27,6 +27,7 @@

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class RetryTopicConfigurationSupport {

private final RetryTopicComponentFactory componentFactory = createComponentFactory();

protected RetryTopicConfigurationSupport() {
public RetryTopicConfigurationSupport() {
Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");
}

Expand Down Expand Up @@ -266,20 +267,21 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
* To provide a custom implementation, either override this method, or
* override the {@link RetryTopicComponentFactory#kafkaBackOffManagerFactory} method
* and return a different {@link KafkaBackOffManagerFactory}.
* @param applicationContext the application context.
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
* {@link MessageListenerContainer} at runtime to be backed off.
* @param wrapper a {@link RetryTopicSchedulerWrapper}.
* @param taskScheduler a {@link TaskScheduler}.
* @return the instance.
*/
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper,
@Nullable TaskScheduler taskScheduler) {

KafkaBackOffManagerFactory backOffManagerFactory =
this.componentFactory.kafkaBackOffManagerFactory(registry);
this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext);
JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
factory -> configurePartitionPausingFactory(factory, registry,
wrapper != null ? wrapper.getScheduler() : taskScheduler));
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.mockito.ArgumentCaptor;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
Expand Down Expand Up @@ -167,7 +168,8 @@ void testCreateBackOffManager() {
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
Clock clock = mock(Clock.class);
given(componentFactory.kafkaBackOffManagerFactory(registry)).willReturn(factory);
ApplicationContext ctx = mock(ApplicationContext.class);
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
given(factory.create()).willReturn(backoffManagerMock);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {

Expand All @@ -177,19 +179,21 @@ protected RetryTopicComponentFactory createComponentFactory() {
}

};
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null,
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
taskSchedulerMock);
assertThat(backoffManager).isEqualTo(backoffManagerMock);
then(componentFactory).should().kafkaBackOffManagerFactory(registry);
then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx);
then(factory).should().create();
}

@Test
void testCreateBackOffManagerNoConfiguration() {
ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class);
TaskScheduler scheduler = mock(TaskScheduler.class);
ApplicationContext ctx = mock(ApplicationContext.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null, scheduler);
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
scheduler);
assertThat(backoffManager).isNotNull();
}

Expand Down

0 comments on commit 2f50088

Please sign in to comment.