Skip to content

Commit

Permalink
Move Task Executor logic to Timing Adjuster
Browse files Browse the repository at this point in the history
  • Loading branch information
tomazfernandes committed Apr 14, 2022
1 parent 5dd5f37 commit b147075
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;

/**
* Provides the component instances that will be used with
* Provide the component instances that will be used with
* {@link RetryTopicConfigurationSupport}. Override any of the methods to provide
* a different implementation or subclass, then override the
* {@link RetryTopicConfigurationSupport#createComponentFactory()} method
Expand All @@ -50,7 +50,7 @@ public class RetryTopicComponentFactory {
private final Clock internalRetryTopicClock = createInternalRetryTopicClock();

/**
* Creates the {@link RetryTopicConfigurer} that will serve as an entry point
* Create the {@link RetryTopicConfigurer} that will serve as an entry point
* for configuring non-blocking topic-based delayed retries for a given
* {@link KafkaListenerEndpoint} by processing the appropriate
* {@link RetryTopicConfiguration}.
Expand All @@ -76,7 +76,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
}

/**
* Creates the {@link DestinationTopicProcessor} that will be used to process the
* Create the {@link DestinationTopicProcessor} that will be used to process the
* {@link DestinationTopic} instances and store them in the provided
* {@link DestinationTopicResolver}.
* @param destinationTopicResolver the {@link DestinationTopicResolver}
Expand All @@ -88,7 +88,7 @@ public DestinationTopicProcessor destinationTopicProcessor(DestinationTopicResol
}

/**
* Creates the instance of {@link DestinationTopicResolver} that will be used to store
* Create the instance of {@link DestinationTopicResolver} that will be used to store
* the {@link DestinationTopic} instances
* and resolve which a given record should be forwarded to.
*
Expand All @@ -99,7 +99,7 @@ public DestinationTopicResolver destinationTopicResolver() {
}

/**
* Creates a {@link DeadLetterPublishingRecovererFactory} that will be used to create
* Create a {@link DeadLetterPublishingRecovererFactory} that will be used to create
* the {@link DeadLetterPublishingRecoverer} that will forward the records to a given
* {@link DestinationTopic}.
*
Expand All @@ -113,7 +113,7 @@ public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory
}

/**
* Creates the {@link ListenerContainerFactoryResolver} that will be used to resolve
* Create the {@link ListenerContainerFactoryResolver} that will be used to resolve
* the appropriate {@link KafkaListenerContainerFactory} for a given topic.
*
* @param beanFactory the {@link BeanFactory} that will be used to retrieve the
Expand All @@ -125,7 +125,7 @@ public ListenerContainerFactoryResolver listenerContainerFactoryResolver(BeanFac
}

/**
* Creates a {@link ListenerContainerFactoryConfigurer} that will be used to
* Create a {@link ListenerContainerFactoryConfigurer} that will be used to
* configure the {@link KafkaListenerContainerFactory} resolved by the
* {@link ListenerContainerFactoryResolver}.
* @param kafkaConsumerBackoffManager the {@link KafkaConsumerBackoffManager} used
Expand All @@ -143,7 +143,7 @@ public ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer(Kaf
}

/**
* Creates the {@link RetryTopicNamesProviderFactory} instance that will be used
* Create the {@link RetryTopicNamesProviderFactory} instance that will be used
* to provide the property names for the retry topics' {@link KafkaListenerEndpoint}.
* @return the instance.
*/
Expand All @@ -152,7 +152,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
}

/**
* Returns the {@link Clock} instance that will be used for all
* Return the {@link Clock} instance that will be used for all
* time-related operations in the retry topic processes.
* @return the instance.
*/
Expand All @@ -161,7 +161,7 @@ public Clock internalRetryTopicClock() {
}

/**
* Creates a {@link Clock} instance that will be used for all time-related operations
* Create a {@link Clock} instance that will be used for all time-related operations
* in the retry topic processes.
* @return the instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.stream.Stream;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -61,12 +60,10 @@
* @author Tomaz Fernandes
* @since 2.9
*/
public class RetryTopicConfigurationSupport implements DisposableBean {
public class RetryTopicConfigurationSupport {

private final RetryTopicComponentFactory componentFactory = createComponentFactory();

private DisposableBean disposableBackOffManagerFactory;

/**
* Return a global {@link RetryTopicConfigurer} for configuring retry topics
* for {@link KafkaListenerEndpoint} instances with a corresponding
Expand Down Expand Up @@ -272,7 +269,6 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry) {
PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(registry);
this.disposableBackOffManagerFactory = factory;
return factory.create();
}

Expand All @@ -292,13 +288,6 @@ RetryTopicBootstrapper retryTopicBootstrapper(ApplicationContext context) {
return new RetryTopicBootstrapper(context, context.getAutowireCapableBeanFactory());
}

@Override
public void destroy() throws Exception {
if (this.disposableBackOffManagerFactory != null) {
this.disposableBackOffManagerFactory.destroy();
}
}

public static class BlockingRetriesConfigurer {

private BackOff backOff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import java.time.Clock;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;

/**
Expand All @@ -31,8 +29,7 @@
* @author Tomaz Fernandes
* @since 2.7
*/
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory
implements DisposableBean {
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {

private boolean timingAdjustmentEnabled = true;

Expand Down Expand Up @@ -150,24 +147,9 @@ private KafkaConsumerTimingAdjuster getOrCreateBackOffTimingAdjustmentManager()
if (this.timingAdjustmentManager != null) {
return this.timingAdjustmentManager;
}
return new WakingKafkaConsumerTimingAdjuster(getOrCreateTimingAdjustmentThreadExecutor());
return this.taskExecutor != null
? new WakingKafkaConsumerTimingAdjuster(this.taskExecutor)
: new WakingKafkaConsumerTimingAdjuster();
}

private TaskExecutor getOrCreateTimingAdjustmentThreadExecutor() {
if (this.taskExecutor != null) {
return this.taskExecutor;
}
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
this.taskExecutor = executor;
return executor;
}

@Override
public void destroy() throws Exception {
if (this.taskExecutor != null
&& ThreadPoolTaskExecutor.class.isAssignableFrom(this.taskExecutor.getClass())) {
((ThreadPoolTaskExecutor) this.taskExecutor).shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
Expand All @@ -32,7 +33,7 @@
/**
*
* A manager that backs off consumption for a given topic if the timestamp provided is not
* due. Use with {@link SeekToCurrentErrorHandler} to guarantee that the message is read
* due. Use with {@link DefaultErrorHandler} to guarantee that the message is read
* again after partition consumption is resumed (or seek it manually by other means).
* It's also necessary to set a {@link ContainerProperties#setIdlePartitionEventInterval(Long)}
* so the Manager can resume the partition consumption.
Expand All @@ -43,10 +44,10 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.7
* @see SeekToCurrentErrorHandler
* @see DefaultErrorHandler
*/
public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManager,
ApplicationListener<ListenerContainerPartitionIdleEvent> {
ApplicationListener<ListenerContainerPartitionIdleEvent>, DisposableBean {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));

Expand Down Expand Up @@ -75,10 +76,7 @@ public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManag
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
this.clock = Clock.systemUTC();
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, kafkaConsumerTimingAdjuster, Clock.systemUTC());
}

/**
Expand All @@ -91,11 +89,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param listenerContainerRegistry the listenerContainerRegistry to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.kafkaConsumerTimingAdjuster = null;
this.clock = Clock.systemUTC();
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, null, Clock.systemUTC());
}

/**
Expand All @@ -107,7 +101,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param clock the clock to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster,
@Nullable KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster,
Clock clock) {

this.listenerContainerRegistry = listenerContainerRegistry;
Expand All @@ -124,11 +118,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
* @param clock the clock to use.
*/
public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry, Clock clock) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.clock = clock;
this.kafkaConsumerTimingAdjuster = null;
this.backOffContexts = new HashMap<>();
this(listenerContainerRegistry, null, clock);
}

/**
Expand Down Expand Up @@ -229,4 +219,11 @@ protected void removeBackoff(TopicPartition topicPartition) {
this.backOffContexts.remove(topicPartition);
}
}

@Override
public void destroy() throws Exception {
if (this.kafkaConsumerTimingAdjuster instanceof DisposableBean) {
((DisposableBean) this.kafkaConsumerTimingAdjuster).destroy();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,9 +22,11 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.Sleeper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;


Expand All @@ -39,7 +41,7 @@
* @since 2.7
* @see KafkaConsumerBackoffManager
*/
public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdjuster {
public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdjuster, DisposableBean {

private static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(WakingKafkaConsumerTimingAdjuster.class));
Expand All @@ -58,17 +60,31 @@ public class WakingKafkaConsumerTimingAdjuster implements KafkaConsumerTimingAdj

private final Sleeper sleeper;

/**
* Create an instance with the provided TaskExecutor and Sleeper.
* @param timingAdjustmentTaskExecutor the task executor.
* @param sleeper the sleeper.
*/
public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor, Sleeper sleeper) {
Assert.notNull(timingAdjustmentTaskExecutor, "Task executor cannot be null.");
Assert.notNull(sleeper, "Sleeper cannot be null.");
this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
this.sleeper = sleeper;
}

/**
* Create an instance with the provided TaskExecutor and a thread sleeper.
* @param timingAdjustmentTaskExecutor the task executor.
*/
public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor) {
Assert.notNull(timingAdjustmentTaskExecutor, "Task executor cannot be null.");
this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
this.sleeper = Thread::sleep;
this(timingAdjustmentTaskExecutor, Thread::sleep);
}

/**
* Create an instance with the default TaskExecutor and a thread sleeper.
*/
public WakingKafkaConsumerTimingAdjuster() {
this(createTaskExecutor(), Thread::sleep);
}

/**
Expand Down Expand Up @@ -138,4 +154,18 @@ private void doApplyTimingAdjustment(Consumer<?, ?> consumerForTimingAdjustment,
"for TopicPartition " + topicPartition);
}
}

private static TaskExecutor createTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.initialize();
return taskExecutor;
}

@Override
public void destroy() throws Exception {
if (this.timingAdjustmentTaskExecutor != null
&& this.timingAdjustmentTaskExecutor instanceof ThreadPoolTaskExecutor) {
((ThreadPoolTaskExecutor) this.timingAdjustmentTaskExecutor).shutdown();
}
}
}

0 comments on commit b147075

Please sign in to comment.