Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomazfernandes committed Apr 15, 2022
1 parent b147075 commit d583db3
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 7 deletions.
Expand Up @@ -20,7 +20,11 @@

import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor;
Expand Down Expand Up @@ -151,6 +155,18 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new SuffixingRetryTopicNamesProviderFactory();
}

/**
* Create the {@link KafkaBackOffManagerFactory} that will be used to create the
* {@link KafkaConsumerBackoffManager} instance used to back off the partitions.
* @param registry the {@link ListenerContainerRegistry} used to fetch the
* {@link MessageListenerContainer}.
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
return new PartitionPausingBackOffManagerFactory(registry);
}


/**
* Return the {@link Clock} instance that will be used for all
* time-related operations in the retry topic processes.
Expand Down
Expand Up @@ -130,9 +130,9 @@ private void processDeadLetterPublishingContainerFactory(
.setDeadLetterPublishingRecovererCustomizer(customizersConfigurer
.deadLetterPublishingRecovererCustomizer);
}
Consumer<DeadLetterPublishingRecovererFactory> dlprConsumer = configureDeadLetterPublishingContainerFactory();
Assert.notNull(dlprConsumer, "configureDeadLetterPublishingContainerFactory must not return null");
dlprConsumer.accept(deadLetterPublishingRecovererFactory);
Consumer<DeadLetterPublishingRecovererFactory> dlprfConsumer = configureDeadLetterPublishingContainerFactory();
Assert.notNull(dlprfConsumer, "configureDeadLetterPublishingContainerFactory must not return null");
dlprfConsumer.accept(deadLetterPublishingRecovererFactory);
}

/**
Expand Down Expand Up @@ -243,6 +243,7 @@ public DestinationTopicResolver destinationTopicResolver() {
}
Consumer<DestinationTopicResolver> resolverConsumer = customizeDestinationTopicResolver();
Assert.notNull(resolverConsumer, "customizeDestinationTopicResolver must not return null");
resolverConsumer.accept(destinationTopicResolver);
return destinationTopicResolver;
}

Expand All @@ -268,8 +269,7 @@ protected Consumer<DestinationTopicResolver> customizeDestinationTopicResolver()
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry) {
PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(registry);
return factory.create();
return componentFactory.kafkaBackOffManagerFactory(registry).create();
}

/**
Expand Down Expand Up @@ -324,12 +324,13 @@ public final NonBlockingRetriesConfigurer addToFatalExceptions(Class<? extends E

@SafeVarargs
public final NonBlockingRetriesConfigurer removeFromFatalExceptions(Class<? extends Exception>... exceptions) {
this.addToFatalExceptions = exceptions;
this.removeFromFatalExceptions = exceptions;
return this;
}

public void clearDefaultFatalExceptions() {
public NonBlockingRetriesConfigurer clearDefaultFatalExceptions() {
this.clearDefaultFatalExceptions = true;
return this;
}
}

Expand Down
@@ -0,0 +1,223 @@
package org.springframework.kafka.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;

import java.time.Clock;
import java.util.Map;
import java.util.function.Consumer;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryResolver;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.util.backoff.BackOff;

class RetryTopicConfigurationSupportTest {

@SuppressWarnings("unchecked")
@Test
void testCreateConfigurer() {
RetryTopicComponentFactory componentFactory = mock(RetryTopicComponentFactory.class);
KafkaConsumerBackoffManager backoffManager = mock(KafkaConsumerBackoffManager.class);
DestinationTopicResolver resolver = mock(DestinationTopicResolver.class);
DestinationTopicProcessor processor = mock(DestinationTopicProcessor.class);
ListenerContainerFactoryConfigurer lcfc = mock(ListenerContainerFactoryConfigurer.class);
ListenerContainerFactoryResolver lcfr = mock(ListenerContainerFactoryResolver.class);
RetryTopicNamesProviderFactory namesProviderFactory = mock(RetryTopicNamesProviderFactory.class);
BeanFactory beanFactory = mock(BeanFactory.class);
DeadLetterPublishingRecovererFactory dlprf = mock(DeadLetterPublishingRecovererFactory.class);
RetryTopicConfigurer topicConfigurer = mock(RetryTopicConfigurer.class);
Clock clock = mock(Clock.class);

given(componentFactory.deadLetterPublishingRecovererFactory(resolver)).willReturn(dlprf);
given(componentFactory.listenerContainerFactoryConfigurer(backoffManager, dlprf, clock)).willReturn(lcfc);
given(componentFactory.listenerContainerFactoryResolver(beanFactory)).willReturn(lcfr);
given(componentFactory.internalRetryTopicClock()).willReturn(clock);
given(componentFactory.destinationTopicProcessor(resolver)).willReturn(processor);
given(componentFactory.retryTopicNamesProviderFactory()).willReturn(namesProviderFactory);
given(componentFactory.retryTopicConfigurer(processor, lcfc, lcfr, namesProviderFactory)).willReturn(topicConfigurer);

Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer = mock(Consumer.class);
Consumer<DeadLetterPublishingRecoverer> dlprCustomizer = mock(Consumer.class);
Consumer<DeadLetterPublishingRecovererFactory> dlprfCustomizer = mock(Consumer.class);
Consumer<RetryTopicConfigurer> rtconfigurer = mock(Consumer.class);
Consumer<ListenerContainerFactoryConfigurer> lcfcConsumer = mock(Consumer.class);
Consumer<CommonErrorHandler> errorHandlerCustomizer = mock(Consumer.class);
BackOff backoff = mock(BackOff.class);

RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return componentFactory;
}

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer
.customizeDeadLetterPublishingRecoverer(dlprCustomizer)
.customizeListenerContainer(listenerContainerCustomizer)
.customizeErrorHandler(errorHandlerCustomizer);
}

@Override
protected Consumer<ListenerContainerFactoryConfigurer> configureListenerContainerFactoryConfigurer() {
return lcfcConsumer;
}

@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprfCustomizer;
}

@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
return rtconfigurer;
}

@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(RuntimeException.class)
.backOff(backoff);
}
};

RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
assertThat(retryTopicConfigurer).isNotNull();

then(componentFactory).should().destinationTopicProcessor(resolver);
then(componentFactory).should().deadLetterPublishingRecovererFactory(resolver);
then(componentFactory).should().listenerContainerFactoryConfigurer(backoffManager, dlprf, clock);
then(componentFactory).should().listenerContainerFactoryResolver(beanFactory);
then(componentFactory).should().retryTopicNamesProviderFactory();
then(componentFactory).should().retryTopicConfigurer(processor, lcfc, lcfr, namesProviderFactory);

then(dlprf).should().setDeadLetterPublishingRecovererCustomizer(dlprCustomizer);
then(lcfc).should().setContainerCustomizer(listenerContainerCustomizer);
then(lcfc).should().setErrorHandlerCustomizer(errorHandlerCustomizer);
assertThatThrownBy(lcfc::setBlockingRetryableExceptions).isInstanceOf(IllegalStateException.class);
then(lcfc).should().setBlockingRetriesBackOff(backoff);
then(dlprfCustomizer).should().accept(dlprf);
then(rtconfigurer).should().accept(topicConfigurer);
then(lcfcConsumer).should().accept(lcfc);

}

@Test
void testRetryTopicConfigurerNoConfiguration() {
KafkaConsumerBackoffManager backoffManager = mock(KafkaConsumerBackoffManager.class);
DestinationTopicResolver resolver = mock(DestinationTopicResolver.class);
BeanFactory beanFactory = mock(BeanFactory.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
assertThat(retryTopicConfigurer).isNotNull();
}

@Test
void testCreateBackOffManager() {
ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class);
RetryTopicComponentFactory componentFactory = mock(RetryTopicComponentFactory.class);
KafkaBackOffManagerFactory factory = mock(KafkaBackOffManagerFactory.class);
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
given(componentFactory.kafkaBackOffManagerFactory(registry)).willReturn(factory);
given(factory.create()).willReturn(backoffManagerMock);

RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return componentFactory;
}
};
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry);

assertThat(backoffManager).isEqualTo(backoffManagerMock);
then(componentFactory).should().kafkaBackOffManagerFactory(registry);
then(factory).should().create();
}

@Test
void testCreateBackOffManagerNoConfiguration() {
ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry);
assertThat(backoffManager).isNotNull();
}

@SuppressWarnings("unchecked")
@Test
void testCreateDestinationTopicResolver() {
RetryTopicComponentFactory componentFactory = mock(RetryTopicComponentFactory.class);
DefaultDestinationTopicResolver resolverMock = mock(DefaultDestinationTopicResolver.class);
given(componentFactory.destinationTopicResolver()).willReturn(resolverMock);
Consumer<DestinationTopicResolver> dtrConsumer = mock(Consumer.class);

RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return componentFactory;
}

@Override
protected Consumer<DestinationTopicResolver> customizeDestinationTopicResolver() {
return dtrConsumer;
}

@Override
protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
nonBlockingRetries
.clearDefaultFatalExceptions()
.removeFromFatalExceptions(IllegalStateException.class);
}
};

DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support.destinationTopicResolver();
assertThat(resolver).isEqualTo(resolverMock);

then(dtrConsumer).should().accept(resolverMock);
then(resolverMock).should().removeClassification(IllegalStateException.class);
then(resolverMock).should().setClassifications(any(Map.class), eq(true));

}

@Test
void testCreateDestinationTopicResolverNoConfiguration() {
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
DestinationTopicResolver resolver = support.destinationTopicResolver();
assertThat(resolver).isNotNull();
}

@Test
void testCreatesComponentFactory() {
RetryTopicConfigurationSupport configurationSupport = new RetryTopicConfigurationSupport();
assertThat(configurationSupport).hasFieldOrProperty("componentFactory").isNotNull();
}

@Test
void testCreatesBootstrapper() {
GenericApplicationContext context = mock(GenericApplicationContext.class);
given(context.getAutowireCapableBeanFactory()).willReturn(mock(DefaultListableBeanFactory.class));
RetryTopicConfigurationSupport configurationSupport = new RetryTopicConfigurationSupport();
assertThat(configurationSupport.retryTopicBootstrapper(context)).isNotNull();
}

}

0 comments on commit d583db3

Please sign in to comment.