Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2226: Add RetryTopicConfigurationSupport #2227

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;

/**
* Provides the component instances that will be used with
* Provide the component instances that will be used with
* {@link RetryTopicConfigurationSupport}. Override any of the methods to provide
* a different implementation or subclass, then override the
* {@link RetryTopicConfigurationSupport#createComponentFactory()} method
Expand All @@ -50,7 +50,7 @@ public class RetryTopicComponentFactory {
private final Clock internalRetryTopicClock = createInternalRetryTopicClock();

/**
* Creates the {@link RetryTopicConfigurer} that will serve as an entry point
* Create the {@link RetryTopicConfigurer} that will serve as an entry point
* for configuring non-blocking topic-based delayed retries for a given
* {@link KafkaListenerEndpoint} by processing the appropriate
* {@link RetryTopicConfiguration}.
Expand All @@ -76,7 +76,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
}

/**
* Creates the {@link DestinationTopicProcessor} that will be used to process the
* Create the {@link DestinationTopicProcessor} that will be used to process the
* {@link DestinationTopic} instances and store them in the provided
* {@link DestinationTopicResolver}.
* @param destinationTopicResolver the {@link DestinationTopicResolver}
Expand All @@ -88,7 +88,7 @@ public DestinationTopicProcessor destinationTopicProcessor(DestinationTopicResol
}

/**
* Creates the instance of {@link DestinationTopicResolver} that will be used to store
* Create the instance of {@link DestinationTopicResolver} that will be used to store
* the {@link DestinationTopic} instances
* and resolve which a given record should be forwarded to.
*
Expand All @@ -99,7 +99,7 @@ public DestinationTopicResolver destinationTopicResolver() {
}

/**
* Creates a {@link DeadLetterPublishingRecovererFactory} that will be used to create
* Create a {@link DeadLetterPublishingRecovererFactory} that will be used to create
* the {@link DeadLetterPublishingRecoverer} that will forward the records to a given
* {@link DestinationTopic}.
*
Expand All @@ -113,7 +113,7 @@ public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory
}

/**
* Creates the {@link ListenerContainerFactoryResolver} that will be used to resolve
* Create the {@link ListenerContainerFactoryResolver} that will be used to resolve
* the appropriate {@link KafkaListenerContainerFactory} for a given topic.
*
* @param beanFactory the {@link BeanFactory} that will be used to retrieve the
Expand All @@ -125,7 +125,7 @@ public ListenerContainerFactoryResolver listenerContainerFactoryResolver(BeanFac
}

/**
* Creates a {@link ListenerContainerFactoryConfigurer} that will be used to
* Create a {@link ListenerContainerFactoryConfigurer} that will be used to
* configure the {@link KafkaListenerContainerFactory} resolved by the
* {@link ListenerContainerFactoryResolver}.
* @param kafkaConsumerBackoffManager the {@link KafkaConsumerBackoffManager} used
Expand All @@ -143,7 +143,7 @@ public ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer(Kaf
}

/**
* Creates the {@link RetryTopicNamesProviderFactory} instance that will be used
* Create the {@link RetryTopicNamesProviderFactory} instance that will be used
* to provide the property names for the retry topics' {@link KafkaListenerEndpoint}.
* @return the instance.
*/
Expand All @@ -152,7 +152,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
}

/**
* Returns the {@link Clock} instance that will be used for all
* Return the {@link Clock} instance that will be used for all
* time-related operations in the retry topic processes.
* @return the instance.
*/
Expand All @@ -161,7 +161,7 @@ public Clock internalRetryTopicClock() {
}

/**
* Creates a {@link Clock} instance that will be used for all time-related operations
* Create a {@link Clock} instance that will be used for all time-related operations
* in the retry topic processes.
* @return the instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.stream.Stream;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -61,12 +60,10 @@
* @author Tomaz Fernandes
* @since 2.9
*/
public class RetryTopicConfigurationSupport implements DisposableBean {
public class RetryTopicConfigurationSupport {

private final RetryTopicComponentFactory componentFactory = createComponentFactory();

private DisposableBean disposableBackOffManagerFactory;

/**
* Return a global {@link RetryTopicConfigurer} for configuring retry topics
* for {@link KafkaListenerEndpoint} instances with a corresponding
Expand Down Expand Up @@ -272,7 +269,6 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry) {
PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(registry);
this.disposableBackOffManagerFactory = factory;
return factory.create();
}

Expand All @@ -292,13 +288,6 @@ RetryTopicBootstrapper retryTopicBootstrapper(ApplicationContext context) {
return new RetryTopicBootstrapper(context, context.getAutowireCapableBeanFactory());
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void destroy() throws Exception {
if (this.disposableBackOffManagerFactory != null) {
this.disposableBackOffManagerFactory.destroy();
}
}

public static class BlockingRetriesConfigurer {

private BackOff backOff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import java.time.Clock;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;

/**
Expand All @@ -31,8 +29,7 @@
* @author Tomaz Fernandes
* @since 2.7
*/
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory
implements DisposableBean {
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {

private boolean timingAdjustmentEnabled = true;

Expand Down Expand Up @@ -150,24 +147,9 @@ private KafkaConsumerTimingAdjuster getOrCreateBackOffTimingAdjustmentManager()
if (this.timingAdjustmentManager != null) {
return this.timingAdjustmentManager;
}
return new WakingKafkaConsumerTimingAdjuster(getOrCreateTimingAdjustmentThreadExecutor());
return this.taskExecutor != null
? new WakingKafkaConsumerTimingAdjuster(this.taskExecutor)
: new WakingKafkaConsumerTimingAdjuster();
}

private TaskExecutor getOrCreateTimingAdjustmentThreadExecutor() {
if (this.taskExecutor != null) {
return this.taskExecutor;
}
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
this.taskExecutor = executor;
return executor;
}

@Override
public void destroy() throws Exception {
if (this.taskExecutor != null
&& ThreadPoolTaskExecutor.class.isAssignableFrom(this.taskExecutor.getClass())) {
((ThreadPoolTaskExecutor) this.taskExecutor).shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
Expand All @@ -46,7 +47,7 @@
* @see DefaultErrorHandler
*/
public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManager,
ApplicationListener<ListenerContainerPartitionIdleEvent> {
ApplicationListener<ListenerContainerPartitionIdleEvent>, DisposableBean {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));

Expand Down Expand Up @@ -75,10 +76,7 @@ public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManag
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
this.clock = Clock.systemUTC();
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, kafkaConsumerTimingAdjuster, Clock.systemUTC());
}

/**
Expand All @@ -91,11 +89,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param listenerContainerRegistry the listenerContainerRegistry to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.kafkaConsumerTimingAdjuster = null;
this.clock = Clock.systemUTC();
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, null, Clock.systemUTC());
}

/**
Expand All @@ -107,7 +101,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param clock the clock to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster,
@Nullable KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster,
Clock clock) {

this.listenerContainerRegistry = listenerContainerRegistry;
Expand All @@ -124,11 +118,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param clock the clock to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry, Clock clock) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.clock = clock;
this.kafkaConsumerTimingAdjuster = null;
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, null, clock);
}

/**
Expand Down Expand Up @@ -229,4 +219,11 @@ protected void removeBackoff(TopicPartition topicPartition) {
this.backOffContexts.remove(topicPartition);
}
}

@Override
public void destroy() throws Exception {
if (this.kafkaConsumerTimingAdjuster instanceof DisposableBean) {
((DisposableBean) this.kafkaConsumerTimingAdjuster).destroy();
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,9 +22,11 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.Sleeper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;


Expand All @@ -39,7 +41,7 @@
* @since 2.7
* @see KafkaConsumerBackoffManager
*/
public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdjuster {
public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdjuster, DisposableBean {

private static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(WakingKafkaConsumerTimingAdjuster.class));
Expand All @@ -58,17 +60,31 @@ public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdj

private final Sleeper sleeper;

/**
* Create an instance with the provided TaskExecutor and Sleeper.
* @param timingAdjustmentTaskExecutor the task executor.
* @param sleeper the sleeper.
*/
public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor, Sleeper sleeper) {
Assert.notNull(timingAdjustmentTaskExecutor, "Task executor cannot be null.");
Assert.notNull(sleeper, "Sleeper cannot be null.");
this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
this.sleeper = sleeper;
}

/**
* Create an instance with the provided TaskExecutor and a thread sleeper.
* @param timingAdjustmentTaskExecutor the task executor.
*/
public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor) {
Assert.notNull(timingAdjustmentTaskExecutor, "Task executor cannot be null.");
this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
this.sleeper = Thread::sleep;
this(timingAdjustmentTaskExecutor, Thread::sleep);
}

/**
* Create an instance with the default TaskExecutor and a thread sleeper.
*/
public WakingKafkaConsumerTimingAdjuster() {
this(createTaskExecutor(), Thread::sleep);
}

/**
Expand Down Expand Up @@ -138,4 +154,18 @@ private void doApplyTimingAdjustment(Consumer<?, ?> consumerForTimingAdjustment,
"for TopicPartition " + topicPartition);
}
}

private static TaskExecutor createTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
taskExecutor.initialize();
return taskExecutor;
}

@Override
public void destroy() throws Exception {
if (this.timingAdjustmentTaskExecutor != null
&& this.timingAdjustmentTaskExecutor instanceof ThreadPoolTaskExecutor) {
((ThreadPoolTaskExecutor) this.timingAdjustmentTaskExecutor).shutdown();
}
}
}