From 0833b208e19ae397c65595237f94978004ccf78c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 28 Jun 2022 16:19:38 -0400 Subject: [PATCH] GH-2239: Replace PartitionPausingBackOffManager New back of manager (and factory) that uses a task scheduler to resume the paused partitions. --- .../config/KafkaListenerEndpointRegistry.java | 22 ++++- .../kafka/listener/BackOffHandler.java | 21 ++++- ...ntainerPartitionPausingBackOffManager.java | 91 +++++++++++++++++++ ...PartitionPausingBackOffManagerFactory.java | 49 ++++++++++ .../ContainerPausingBackOffHandler.java | 7 ++ .../KafkaMessageListenerContainer.java | 9 ++ .../ListenerContainerPauseService.java | 21 ++++- .../listener/ListenerContainerRegistry.java | 13 ++- ...PartitionPausingBackOffManagerFactory.java | 2 + .../PartitionPausingBackoffManager.java | 4 +- .../RetryTopicComponentFactory.java | 4 +- .../RetryTopicConfigurationSupport.java | 82 +++++++++-------- 12 files changed, 280 insertions(+), 45 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index dd55ab4c55..d112b162b6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -75,6 +75,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR + private final Map unregisteredContainers = new ConcurrentHashMap<>(); + private final Map listenerContainers = new ConcurrentHashMap<>(); private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE; @@ -109,6 +111,17 @@ public MessageListenerContainer getListenerContainer(String id) { return this.listenerContainers.get(id); } + @Override + @Nullable + public MessageListenerContainer getUnregisteredListenerContainer(String id) { + MessageListenerContainer container = this.unregisteredContainers.get(id); + if (container == null) { + refreshContextContainers(); + return this.unregisteredContainers.get(id); + } + return null; + } + /** * By default, containers registered for endpoints after the context is refreshed * are immediately started, regardless of their autoStartup property, to comply with @@ -156,10 +169,17 @@ public Collection getListenerContainers() { public Collection getAllListenerContainers() { List containers = new ArrayList<>(); containers.addAll(getListenerContainers()); - containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values()); + refreshContextContainers(); + containers.addAll(this.unregisteredContainers.values()); return containers; } + private void refreshContextContainers() { + this.unregisteredContainers.clear(); + this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values() + .forEach(container -> this.unregisteredContainers.put(container.getListenerId(), container)); + } + /** * Create a message listener container for the given {@link KafkaListenerEndpoint}. *

This create the necessary infrastructure to honor that endpoint diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java index 4b7f3417b9..14193b9949 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java @@ -16,15 +16,18 @@ package org.springframework.kafka.listener; +import org.apache.kafka.common.TopicPartition; + import org.springframework.lang.Nullable; /** * Handler for the provided back off time, listener container and exception. + * Also supports back off for individual partitions. * - * @author Jan Marincek - * @since 2.9 + * @author Jan Marincek + * @author Gary Russell + * @since 2.9 */ -@FunctionalInterface public interface BackOffHandler { /** @@ -35,4 +38,16 @@ public interface BackOffHandler { */ void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff); + /** + * Perform the next back off for a partition. + * @param container the container. + * @param partition the partition. + * @param nextBackOff the next back off. + */ + default void onNextBackOff(@Nullable MessageListenerContainer container, TopicPartition partition, + long nextBackOff) { + + throw new UnsupportedOperationException(); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java new file mode 100644 index 0000000000..9f637eb044 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.common.TopicPartition; + +import org.springframework.core.log.LogAccessor; +import org.springframework.util.Assert; + +/** + * + * A manager that backs off consumption for a given topic if the timestamp provided is not + * due. Use with {@link DefaultErrorHandler} to guarantee that the message is read + * again after partition consumption is resumed (or seek it manually by other means). + * Note that when a record backs off the partition consumption gets paused for + * approximately that amount of time, so you must have a fixed backoff value per partition. + * + * @author Tomaz Fernandes + * @author Gary Russell + * @since 2.9 + * @see DefaultErrorHandler + */ +public class ContainerPartitionPausingBackOffManager implements KafkaConsumerBackoffManager { + + private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class)); + + private final ListenerContainerRegistry listenerContainerRegistry; + + private final BackOffHandler backOffHandler; + + /** + * Construct an instance with the provided registry and back off handler. + * @param listenerContainerRegistry + * @param backOffHandler + */ + public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry, + BackOffHandler backOffHandler) { + + this.listenerContainerRegistry = listenerContainerRegistry; + this.backOffHandler = backOffHandler; + } + + /** + * Backs off if the current time is before the dueTimestamp provided + * in the {@link Context} object. + * @param context the back off context for this execution. + */ + @Override + public void backOffIfNecessary(Context context) { + long backoffTime = context.getDueTimestamp() - System.currentTimeMillis(); + LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + context); + if (backoffTime > 0) { + pauseConsumptionAndThrow(context, backoffTime); + } + } + + private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException { + TopicPartition topicPartition = context.getTopicPartition(); + getListenerContainerFromContext(context).pausePartition(topicPartition); + this.backOffHandler.onNextBackOff(getListenerContainerFromContext(context), topicPartition, backOffTime); + throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " + + "backing off for approx. %s millis.", context.getTopicPartition().partition(), + context.getTopicPartition().topic(), backOffTime), + topicPartition, context.getListenerId(), context.getDueTimestamp()); + } + + private MessageListenerContainer getListenerContainerFromContext(Context context) { + MessageListenerContainer container = this.listenerContainerRegistry.getListenerContainer(context.getListenerId()); // NOSONAR + if (container == null) { + container = this.listenerContainerRegistry.getUnregisteredListenerContainer(context.getListenerId()); + } + Assert.notNull(container, () -> "No container found with id: " + context.getListenerId()); + return container; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java new file mode 100644 index 0000000000..2292aaee8e --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +/** + * A factory for {@link ContainerPartitionPausingBackoffManager}. + * + * @author Gary Russell + * @since 2.9 + * + */ +public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory { + + private BackOffHandler backOffHandler; + + /** + * Construct an instance with the provided properties. + * @param listenerContainerRegistry the registry. + * @param backOffHandler the back off handler. + */ + public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) { + + super(listenerContainerRegistry); + } + + @Override + protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) { + return new ContainerPartitionPausingBackOffManager(getListenerContainerRegistry(), this.backOffHandler); + } + + public void setBackOffHandler(BackOffHandler backOffHandler) { + this.backOffHandler = backOffHandler; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java index 2562f840ec..aff58d2a19 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java @@ -18,6 +18,8 @@ import java.time.Duration; +import org.apache.kafka.common.TopicPartition; + import org.springframework.lang.Nullable; /** @@ -51,4 +53,9 @@ public void onNextBackOff(@Nullable MessageListenerContainer container, Exceptio } } + @Override + public void onNextBackOff(MessageListenerContainer container, TopicPartition partition, long nextBackOff) { + this.pauser.pausePartition(container, partition, Duration.ofMillis(nextBackOff)); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 879ac185ce..f843cf11f4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -318,6 +318,15 @@ public void resume() { } } + @Override + public void resumePartition(TopicPartition topicPartition) { + super.resumePartition(topicPartition); + KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; + if (consumer != null) { + this.listenerConsumer.wakeIfNecessary(); + } + } + @Override public Map> metrics() { ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java index 6fe241da2e..8dddd56c7a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.common.TopicPartition; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; @@ -81,13 +82,31 @@ public void pause(MessageListenerContainer messageListenerContainer, Duration pa } else { Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis()); - LOGGER.debug(() -> "Pausing container " + messageListenerContainer + "resume scheduled for " + LOGGER.debug(() -> "Pausing container " + messageListenerContainer + ", resume scheduled for " + resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime()); messageListenerContainer.pause(); this.scheduler.schedule(() -> resume(messageListenerContainer), resumeAt); } } + /** + * Pause consumption from a given partition for the duration. + * @param messageListenerContainer the container. + * @param partition the partition. + * @param pauseDuration the duration. + */ + public void pausePartition(MessageListenerContainer messageListenerContainer, TopicPartition partition, + Duration pauseDuration) { + + Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis()); + LOGGER.debug(() -> "Pausing container: " + messageListenerContainer + " partition: " + partition + + ", resume scheduled for " + + resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime()); + messageListenerContainer.pausePartition(partition); + this.scheduler.schedule(() -> messageListenerContainer.resumePartition(partition), resumeAt); + + } + /** * Resume the listener container by given id. * @param listenerId the id of the listener diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java index 6420de1de8..968d3a8705 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,17 @@ public interface ListenerContainerRegistry { @Nullable MessageListenerContainer getListenerContainer(String id); + /** + * Return the {@link MessageListenerContainer} with the specified id or {@code null} + * if no such container exists. Returns containers that are not registered with the + * registry, but exist in the application context. + * @param id the id of the container + * @return the container or {@code null} if no container with that id exists + * @see #getListenerContainerIds() + */ + @Nullable + MessageListenerContainer getUnregisteredListenerContainer(String id); + /** * Return the ids of the managed {@link MessageListenerContainer} instance(s). * @return the ids. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java index 63dc63f096..4cabed2b9a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java @@ -29,10 +29,12 @@ * IMPORTANT: Since 2.9 this class doesn't create a {@link ThreadPoolTaskExecutor} * by default. In order for the factory to create a {@link KafkaConsumerTimingAdjuster}, * such thread executor must be provided. + * @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}. * * @author Tomaz Fernandes * @since 2.7 */ +@Deprecated public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory { private boolean timingAdjustmentEnabled = true; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java index 965a9359f9..32882ef3df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java @@ -39,12 +39,14 @@ * * Note that when a record backs off the partition consumption gets paused for * approximately that amount of time, so you must have a fixed backoff value per partition. + * @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}. * * @author Tomaz Fernandes * @author Gary Russell * @since 2.7 * @see DefaultErrorHandler */ +@Deprecated public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManager, ApplicationListener { @@ -137,7 +139,7 @@ public void backOffIfNecessary(Context context) { private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException { TopicPartition topicPartition = context.getTopicPartition(); getListenerContainerFromContext(context).pausePartition(topicPartition); - addBackoff(context, topicPartition); + throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " + "backing off for approx. %s millis.", context.getTopicPartition().partition(), context.getTopicPartition().topic(), backOffTime), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java index cfae581f93..e378e321ba 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java @@ -21,12 +21,12 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpoint; +import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.KafkaBackOffManagerFactory; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.ListenerContainerRegistry; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; /** @@ -153,7 +153,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() { * @return the instance. */ public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) { - return new PartitionPausingBackOffManagerFactory(registry); + return new ContainerPartitionPausingBackOffManagerFactory(registry); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java index 390caaf63a..05ce2ced6b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java @@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -37,18 +39,22 @@ import org.springframework.kafka.config.KafkaListenerEndpoint; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory; +import org.springframework.kafka.listener.ContainerPausingBackOffHandler; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.ExceptionClassifier; import org.springframework.kafka.listener.KafkaBackOffManagerFactory; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster; +import org.springframework.kafka.listener.ListenerContainerPauseService; import org.springframework.kafka.listener.ListenerContainerRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory; import org.springframework.kafka.listener.WakingKafkaConsumerTimingAdjuster; import org.springframework.kafka.support.JavaUtils; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; @@ -272,13 +278,14 @@ protected Consumer configureDestinationTopicResolver() @Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME) public KafkaConsumerBackoffManager kafkaConsumerBackoffManager( @Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) - ListenerContainerRegistry registry, - @Qualifier(BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME) TaskExecutor taskExecutor) { + ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper, + @Nullable TaskScheduler taskScheduler) { KafkaBackOffManagerFactory backOffManagerFactory = this.componentFactory.kafkaBackOffManagerFactory(registry); - JavaUtils.INSTANCE.acceptIfInstanceOf(PartitionPausingBackOffManagerFactory.class, backOffManagerFactory, - factory -> configurePartitionPausingFactory(taskExecutor, factory)); + JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory, + factory -> configurePartitionPausingFactory(factory, registry, + wrapper != null ? wrapper.getScheduler() : taskScheduler)); return backOffManagerFactory.create(); } @@ -289,38 +296,12 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager( * {@link #configureKafkaBackOffManager} method for furher customization. * @param factory the factory instance. */ - private void configurePartitionPausingFactory(TaskExecutor taskExecutor, - PartitionPausingBackOffManagerFactory factory) { - KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer(); - configureKafkaBackOffManager(configurer); - Assert.isTrue(!configurer.timingAdjustmentEnabled - || configurer.maxThreadPoolSize == null - || ThreadPoolTaskExecutor.class.isAssignableFrom(taskExecutor.getClass()), - "TaskExecutor must be an instance of ThreadPoolTaskExecutor to set maxThreadPoolSize"); - factory.setTimingAdjustmentEnabled(configurer.timingAdjustmentEnabled); - if (ThreadPoolTaskExecutor.class.isAssignableFrom(taskExecutor.getClass())) { - JavaUtils.INSTANCE - .acceptIfNotNull(configurer.maxThreadPoolSize, poolSize -> ((ThreadPoolTaskExecutor) taskExecutor) - .setMaxPoolSize(poolSize)); - } - JavaUtils.INSTANCE - .acceptIfCondition(configurer.timingAdjustmentEnabled, taskExecutor, factory::setTaskExecutor) - .acceptIfNotNull(configurer.clock, factory::setClock); - } + private void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory, + ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) { - /** - * Create the {@link TaskExecutor} instance that will be used with the - * {@link WakingKafkaConsumerTimingAdjuster}, if timing adjustment is enabled. - * @return the instance. - */ - @Bean(name = BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME) - public TaskExecutor backoffManagerTaskExecutor() { - KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer(); - configureKafkaBackOffManager(configurer); - return configurer.timingAdjustmentEnabled - ? new ThreadPoolTaskExecutor() - : task -> { - }; + Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required"); + factory.setBackOffHandler(new ContainerPausingBackOffHandler( + new ListenerContainerPauseService(registry, scheduler))); } /** @@ -479,4 +460,33 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer