From dd981e1af6631966f013cf05a344c347d61ecef1 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 11 Jul 2022 16:13:18 -0400 Subject: [PATCH] GH-2239: Fix Boot AutoConfiguration See https://github.com/spring-projects/spring-kafka/issues/2239 The previous commit removed the implicit bootstrap in favor of enforcing the user to use the `@EnableKafkaRretryTopic` or explicitly extend `RetryTopicConfigurationSupport`. Unfortunately, this breaks Spring Boot because it can auto configure a `RetryTopicConfiguration` bean, which means the infrastructure beans are required. Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration` bean is found in the application context. Tested with a Boot app. **cherry-pick to main** --- ...kaListenerAnnotationBeanPostProcessor.java | 49 +++++++++++++++++-- .../AbstractKafkaBackOffManagerFactory.java | 2 +- ...PartitionPausingBackOffManagerFactory.java | 7 ++- .../kafka/retrytopic/RetryTopicBeanNames.java | 14 ++++++ .../RetryTopicComponentFactory.java | 8 ++- .../RetryTopicConfigurationSupport.java | 8 +-- .../RetryTopicConfigurationSupportTests.java | 12 +++-- 7 files changed, 86 insertions(+), 14 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 6f4a477b74..b0132f40d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -62,6 +62,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.expression.StandardBeanExpressionResolver; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.MethodIntrospector; import org.springframework.core.OrderComparator; import org.springframework.core.Ordered; @@ -83,11 +84,15 @@ import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; import org.springframework.kafka.listener.ContainerGroupSequencer; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.kafka.retrytopic.DestinationTopicResolver; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; import org.springframework.kafka.retrytopic.RetryTopicConfigurer; +import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.GenericMessageConverter; @@ -96,6 +101,8 @@ import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; @@ -516,14 +523,50 @@ private RetryTopicConfigurer getRetryTopicConfigurer() { .getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class); } catch (NoSuchBeanDefinitionException ex) { - this.logger.error("A 'RetryTopicConfigurer' with name " - + RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required."); - throw ex; + this.retryTopicConfigurer = createDefaultConfigurer(); } } return this.retryTopicConfigurer; } + private RetryTopicConfigurer createDefaultConfigurer() { + if (this.applicationContext instanceof GenericApplicationContext) { + GenericApplicationContext gac = (GenericApplicationContext) this.applicationContext; + gac.registerBean( + RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME, + RetryTopicConfigurationSupport.class, + () -> new RetryTopicConfigurationSupport()); + RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean( + RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME, + RetryTopicConfigurationSupport.class); + DestinationTopicResolver destResolver = rtcs.destinationTopicResolver(); + RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique(); + TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique(); + if (schedW == null && sched == null) { + RetryTopicSchedulerWrapper newSchedW = new RetryTopicSchedulerWrapper(new ThreadPoolTaskScheduler()); + gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME, + RetryTopicSchedulerWrapper.class, () -> newSchedW); + schedW = gac.getBean(RetryTopicSchedulerWrapper.class); + } + KafkaConsumerBackoffManager bom = + rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(), + schedW, sched); + RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory); + + gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class, + () -> destResolver); + gac.registerBean(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME, + KafkaConsumerBackoffManager.class, () -> bom); + gac.registerBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class, + () -> rtc); + + return this.beanFactory + .getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class); + } + throw new IllegalStateException("When there is no RetryTopicConfigurationSupport bean, the application context " + + "must be a GenericApplicationContext"); + } + private Method checkProxy(Method methodArg, Object bean) { Method method = methodArg; if (AopUtils.isJdkDynamicProxy(bean)) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java index 98d83e0eae..c3f53491b3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java @@ -87,7 +87,7 @@ protected T getBean(String beanName, Class beanClass) { } @Override - public void setApplicationContext(ApplicationContext applicationContext) { + public final void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java index ebf428381f..7d850ae56a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java @@ -16,6 +16,7 @@ package org.springframework.kafka.listener; +import org.springframework.context.ApplicationContext; import org.springframework.util.Assert; /** @@ -32,9 +33,13 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk /** * Construct an instance with the provided properties. * @param listenerContainerRegistry the registry. + * @param applicationContext the application context. */ - public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) { + public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry, + ApplicationContext applicationContext) { + super(listenerContainerRegistry); + setApplicationContext(applicationContext); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java index d1f4f6612a..f9c7fd6487 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java @@ -19,6 +19,7 @@ /** * The bean names for the non-blocking topic-based delayed retries feature. * @author Tomaz Fernandes + * @author Gary Russell * @since 2.9 */ public final class RetryTopicBeanNames { @@ -26,6 +27,13 @@ public final class RetryTopicBeanNames { private RetryTopicBeanNames() { } + /** + * The bean name of an internally managed retry topic configuration support, if + * needed. + */ + public static final String DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME = + "org.springframework.kafka.retrytopic.internalRetryTopicConfigurationSupport"; + /** * The bean name of the internally managed retry topic configurer. */ @@ -50,4 +58,10 @@ private RetryTopicBeanNames() { public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME = "defaultRetryTopicKafkaTemplate"; + /** + * The bean name of the internally registered scheduler wrapper, if needed. + */ + public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME = + "defaultRetryTopicKafkaTemplate"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java index e378e321ba..2d6c5bb160 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java @@ -19,6 +19,7 @@ import java.time.Clock; import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationContext; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpoint; import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory; @@ -150,10 +151,13 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() { * {@link KafkaConsumerBackoffManager} instance used to back off the partitions. * @param registry the {@link ListenerContainerRegistry} used to fetch the * {@link MessageListenerContainer}. + * @param applicationContext the application context. * @return the instance. */ - public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) { - return new ContainerPartitionPausingBackOffManagerFactory(registry); + public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry, + ApplicationContext applicationContext) { + + return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java index 4bfecbc5d5..b2ae0e62e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java @@ -27,6 +27,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaRetryTopic; @@ -72,7 +73,7 @@ public class RetryTopicConfigurationSupport { private final RetryTopicComponentFactory componentFactory = createComponentFactory(); - protected RetryTopicConfigurationSupport() { + public RetryTopicConfigurationSupport() { Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed"); } @@ -266,6 +267,7 @@ protected Consumer configureDestinationTopicResolver() * To provide a custom implementation, either override this method, or * override the {@link RetryTopicComponentFactory#kafkaBackOffManagerFactory} method * and return a different {@link KafkaBackOffManagerFactory}. + * @param applicationContext the application context. * @param registry the {@link ListenerContainerRegistry} to be used to fetch the * {@link MessageListenerContainer} at runtime to be backed off. * @param wrapper a {@link RetryTopicSchedulerWrapper}. @@ -273,13 +275,13 @@ protected Consumer configureDestinationTopicResolver() * @return the instance. */ @Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME) - public KafkaConsumerBackoffManager kafkaConsumerBackoffManager( + public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext, @Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable TaskScheduler taskScheduler) { KafkaBackOffManagerFactory backOffManagerFactory = - this.componentFactory.kafkaBackOffManagerFactory(registry); + this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext); JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory, factory -> configurePartitionPausingFactory(factory, registry, wrapper != null ? wrapper.getScheduler() : taskScheduler)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java index 0386b5fd4f..0f072ed997 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationContext; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; @@ -167,7 +168,8 @@ void testCreateBackOffManager() { KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class); TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); Clock clock = mock(Clock.class); - given(componentFactory.kafkaBackOffManagerFactory(registry)).willReturn(factory); + ApplicationContext ctx = mock(ApplicationContext.class); + given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory); given(factory.create()).willReturn(backoffManagerMock); RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() { @@ -177,10 +179,10 @@ protected RetryTopicComponentFactory createComponentFactory() { } }; - KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null, + KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null, taskSchedulerMock); assertThat(backoffManager).isEqualTo(backoffManagerMock); - then(componentFactory).should().kafkaBackOffManagerFactory(registry); + then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx); then(factory).should().create(); } @@ -188,8 +190,10 @@ protected RetryTopicComponentFactory createComponentFactory() { void testCreateBackOffManagerNoConfiguration() { ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class); TaskScheduler scheduler = mock(TaskScheduler.class); + ApplicationContext ctx = mock(ApplicationContext.class); RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport(); - KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null, scheduler); + KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null, + scheduler); assertThat(backoffManager).isNotNull(); }