Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
* Change deprecated constants for strings
* Remove deprecation suppressions
* Remove ApplicationContext and BeanFactory from constructors
* Change TaskExecutor shutdown logic to DisposableBean
  • Loading branch information
tomazfernandes committed Apr 12, 2022
1 parent 2d29a2b commit 9eb9ec4
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand Down Expand Up @@ -510,11 +509,10 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
return true;
}

@SuppressWarnings("deprecation")
private RetryTopicConfigurer getRetryTopicConfigurer() {
bootstrapRetryTopicIfNecessary();
return this.beanFactory.containsBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
? this.beanFactory.getBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class)
return this.beanFactory.containsBean("internalRetryTopicConfigurer")
? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}

Expand All @@ -527,13 +525,10 @@ private void bootstrapRetryTopicIfNecessary() {
+ this.beanFactory.getClass().getSimpleName());
}
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
if (!registry.containsBeanDefinition(RetryTopicInternalBeanNames
.RETRY_TOPIC_BOOTSTRAPPER)) {
registry.registerBeanDefinition(RetryTopicInternalBeanNames
.RETRY_TOPIC_BOOTSTRAPPER,
if (!registry.containsBeanDefinition("internalRetryTopicBootstrapper")) {
registry.registerBeanDefinition("internalRetryTopicBootstrapper",
new RootBeanDefinition(RetryTopicBootstrapper.class));
this.beanFactory.getBean(RetryTopicInternalBeanNames
.RETRY_TOPIC_BOOTSTRAPPER, RetryTopicBootstrapper.class).bootstrapRetryTopic();
this.beanFactory.getBean("internalRetryTopicBootstrapper", RetryTopicBootstrapper.class).bootstrapRetryTopic();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@

import java.time.Clock;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
Expand All @@ -46,9 +41,9 @@
* @author Tomaz Fernandes
* @since 2.9
*/
public class KafkaBackOffManagerConfigurationSupport implements ApplicationContextAware {
public class KafkaBackOffManagerConfigurationSupport implements DisposableBean {

private ConfigurableApplicationContext applicationContext;
private ThreadPoolTaskExecutor taskExecutor;

/**
* Provides the {@link KafkaConsumerBackoffManager} instance.
Expand Down Expand Up @@ -93,16 +88,17 @@ protected KafkaConsumerTimingAdjuster timingAdjuster(TaskExecutor taskExecutor)
* @return the instance.
*/
protected TaskExecutor timingAdjusterTaskExecutor() {
Assert.isNull(this.taskExecutor, "A TaskExecutor has already been set.");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
this.applicationContext
.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> executor.shutdown());
this.taskExecutor = executor;
return executor;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Assert.isInstanceOf(ConfigurableApplicationContext.class, applicationContext);
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
public void destroy() throws Exception {
if (this.taskExecutor != null) {
this.taskExecutor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.Clock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
Expand Down Expand Up @@ -55,7 +54,6 @@ public class RetryTopicComponentFactory {
* for configuring non-blocking topic-based delayed retries for a given
* {@link KafkaListenerEndpoint} by processing the appropriate
* {@link RetryTopicConfiguration}.
* @param beanFactory the {@link BeanFactory} to be used in the configuration process.
* @param destinationTopicProcessor the {@link DestinationTopicProcessor} that will be
* used to process the {@link DestinationTopic} instances.
* @param listenerContainerFactoryConfigurer the
Expand All @@ -69,13 +67,12 @@ public class RetryTopicComponentFactory {
* that will be used to provide the property names for the retry topics' endpoints.
* @return the instance.
*/
public RetryTopicConfigurer retryTopicConfigurer(BeanFactory beanFactory,
DestinationTopicProcessor destinationTopicProcessor,
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
ListenerContainerFactoryResolver factoryResolver,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
return new RetryTopicConfigurer(destinationTopicProcessor, factoryResolver,
listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
listenerContainerFactoryConfigurer, retryTopicNamesProviderFactory);
}

/**
Expand All @@ -95,11 +92,10 @@ public DestinationTopicProcessor destinationTopicProcessor(DestinationTopicResol
* the {@link DestinationTopic} instances
* and resolve which a given record should be forwarded to.
*
* @param context the {@link ApplicationContext} to be used.
* @return the instance.
*/
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
return new DefaultDestinationTopicResolver(this.internalRetryTopicClock, context);
public DestinationTopicResolver destinationTopicResolver() {
return new DefaultDestinationTopicResolver(this.internalRetryTopicClock);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public RetryTopicConfigurer retryTopicConfigurer(@Qualifier(KafkaListenerConfigU
processListenerContainerFactoryConfigurer(lcfc);

RetryTopicConfigurer retryTopicConfigurer = this.componentFactory
.retryTopicConfigurer(beanFactory, destinationTopicProcessor, lcfc,
.retryTopicConfigurer(destinationTopicProcessor, lcfc,
factoryResolver, retryTopicNamesProviderFactory);

Consumer<RetryTopicConfigurer> configurerConsumer = configureRetryTopicConfigurer();
Expand Down Expand Up @@ -219,12 +219,11 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
* <li>{@link #createComponentFactory} for providing a subclass instance.
* </ul>
*
* @param context the {@link ApplicationContext}.
* @return the instance.
*/
@Bean(name = RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
DestinationTopicResolver destinationTopicResolver = this.componentFactory.destinationTopicResolver(context);
public DestinationTopicResolver destinationTopicResolver() {
DestinationTopicResolver destinationTopicResolver = this.componentFactory.destinationTopicResolver();
if (destinationTopicResolver instanceof DefaultDestinationTopicResolver) {
DefaultDestinationTopicResolver ddtr = (DefaultDestinationTopicResolver) destinationTopicResolver;
NonBlockingRetriesConfigurer configurer = new NonBlockingRetriesConfigurer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.listener.ExceptionClassifier;
Expand All @@ -47,7 +49,7 @@
*
*/
public class DefaultDestinationTopicResolver extends ExceptionClassifier
implements DestinationTopicResolver, ApplicationListener<ContextRefreshedEvent> {
implements DestinationTopicResolver, ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {

private static final String NO_OPS_SUFFIX = "-noOps";

Expand All @@ -60,18 +62,37 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier

private final Clock clock;

private final ApplicationContext applicationContext;
private ApplicationContext applicationContext;

private boolean contextRefreshed;

@Deprecated
public DefaultDestinationTopicResolver(Clock clock, ApplicationContext applicationContext) {
this(clock);
this.applicationContext = applicationContext;
}

/**
* Constructs an instance with the given clock.
* @param clock the clock to be used for time-based operations
* such as verifying timeouts.
* @since 2.9
*/
public DefaultDestinationTopicResolver(Clock clock) {
this.clock = clock;
this.sourceDestinationsHolderMap = new HashMap<>();
this.destinationsTopicMap = new HashMap<>();
this.contextRefreshed = false;
}

/**
* Constructs an instance with a default clock.
* @since 2.9
*/
public DefaultDestinationTopicResolver() {
this(Clock.systemUTC());
}

@Override
public DestinationTopic resolveDestinationTopic(String topic, Integer attempt, Exception e,
long originalTimestamp) {
Expand Down Expand Up @@ -197,6 +218,11 @@ public boolean isContextRefreshed() {
return this.contextRefreshed;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public static class DestinationTopicHolder {

private final DestinationTopic sourceDestination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ public class ListenerContainerFactoryConfigurer {

private final Clock clock;

@SuppressWarnings("deprecation")
public ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
@Qualifier("internalBackOffClock") Clock clock) {
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
this.deadLetterPublishingRecovererFactory = deadLetterPublishingRecovererFactory;
this.clock = clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -52,7 +53,6 @@ public class ListenerContainerFactoryResolver {

private final Cache retryEndpointCache;

@SuppressWarnings("deprecation")
public ListenerContainerFactoryResolver(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.mainEndpointCache = new Cache();
Expand All @@ -64,7 +64,7 @@ public ListenerContainerFactoryResolver(BeanFactory beanFactory) {
(fromKLAnnotation, configuration) -> configuration.factoryFromRetryTopicConfiguration,
(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
(fromKLAnnotation, configuration) ->
fromBeanName(RetryTopicInternalBeanNames.DEFAULT_LISTENER_FACTORY_BEAN_NAME),
fromBeanName("internalRetryTopicListenerContainerFactory"),
(fromKLAnnotation, configuration) ->
fromBeanName(RetryTopicBeanNames.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME));

Expand All @@ -74,7 +74,7 @@ public ListenerContainerFactoryResolver(BeanFactory beanFactory) {
(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
(fromKLAnnotation, configuration) -> fromKLAnnotation,
(fromKLAnnotation, configuration) ->
fromBeanName(RetryTopicInternalBeanNames.DEFAULT_LISTENER_FACTORY_BEAN_NAME),
fromBeanName("internalRetryTopicListenerContainerFactory"),
(fromKLAnnotation, configuration) ->
fromBeanName(RetryTopicBeanNames.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME));
}
Expand Down Expand Up @@ -141,9 +141,14 @@ private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContaine

@Nullable
private ConcurrentKafkaListenerContainerFactory<?, ?> fromBeanName(String factoryBeanName) {
return StringUtils.hasText(factoryBeanName)
? this.beanFactory.getBean(factoryBeanName, ConcurrentKafkaListenerContainerFactory.class)
: null;
try {
return StringUtils.hasText(factoryBeanName)
? this.beanFactory.getBean(factoryBeanName, ConcurrentKafkaListenerContainerFactory.class)
: null;
}
catch (NoSuchBeanDefinitionException ex) {
return null;
}
}

private interface FactoryResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -56,7 +58,7 @@
* <p>How it works:
*
* <p>If a message processing throws an exception, the configured
* {@link org.springframework.kafka.listener.SeekToCurrentErrorHandler}
* {@link org.springframework.kafka.listener.CommonErrorHandler}
* and {@link org.springframework.kafka.listener.DeadLetterPublishingRecoverer} forwards the message to the next topic, using a
* {@link org.springframework.kafka.retrytopic.DestinationTopicResolver}
* to know the next topic and the delay for it.
Expand Down Expand Up @@ -152,7 +154,7 @@
* <p>DLT Handling:
*
* <p>The DLT handler method can be provided through the
* {@link RetryTopicConfigurationBuilder#dltHandlerMethod(Class, String)} method,
* {@link RetryTopicConfigurationBuilder#dltHandlerMethod(String, String)} method,
* providing the class and method name that should handle the DLT topic. If a bean
* instance of this type is found in the {@link BeanFactory} it is the instance used.
* If not an instance is created. The class can use dependency injection as a normal bean.
Expand Down Expand Up @@ -196,11 +198,11 @@
* @see org.springframework.kafka.annotation.RetryableTopic
* @see org.springframework.kafka.annotation.KafkaListener
* @see org.springframework.retry.annotation.Backoff
* @see org.springframework.kafka.listener.SeekToCurrentErrorHandler
* @see org.springframework.kafka.listener.DefaultErrorHandler
* @see org.springframework.kafka.listener.DeadLetterPublishingRecoverer
*
*/
public class RetryTopicConfigurer {
public class RetryTopicConfigurer implements BeanFactoryAware {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurer.class));

Expand All @@ -216,7 +218,7 @@ public class RetryTopicConfigurer {

private final ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;

private final BeanFactory beanFactory;
private BeanFactory beanFactory;

private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

Expand All @@ -230,17 +232,34 @@ public class RetryTopicConfigurer {
* @param beanFactory the bean factory.
* @param retryTopicNamesProviderFactory the retry topic names factory.
*/
@Autowired
@Deprecated
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {

this(destinationTopicProcessor, containerFactoryResolver,
listenerContainerFactoryConfigurer, retryTopicNamesProviderFactory);
this.beanFactory = beanFactory;
}

/**
* Create an instance with the provided properties.
* @param destinationTopicProcessor the destination topic processor.
* @param containerFactoryResolver the container factory resolver.
* @param listenerContainerFactoryConfigurer the container factory configurer.
* @param retryTopicNamesProviderFactory the retry topic names factory.
*/
@Autowired
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {

this.destinationTopicProcessor = destinationTopicProcessor;
this.containerFactoryResolver = containerFactoryResolver;
this.listenerContainerFactoryConfigurer = listenerContainerFactoryConfigurer;
this.beanFactory = beanFactory;
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
}

Expand Down Expand Up @@ -418,6 +437,11 @@ public void useLegacyFactoryConfigurer(boolean useLegacyFactoryConfigurer) {
this.useLegacyFactoryConfigurer = useLegacyFactoryConfigurer;
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

public interface EndpointProcessor extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {

default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
Expand Down

0 comments on commit 9eb9ec4

Please sign in to comment.