Skip to content

Commit

Permalink
GH-2239: RetryableTopic Refactoring
Browse files Browse the repository at this point in the history
Resolves #2239

* GH-2239: Replace PartitionPausingBackOffManager
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.
Revert change to deprecated PartitionPausingBackoffManager.
Log resume.
* Remove legacy code.
Also fix unrelated race in EKIT.
Only allow one `RetryTemplateConfigurationSupport` bean.
* Fix static var.
* Docs.
* More docs.
* Remove more dead/deprecated code.
* Address PR Comments.
* Fix RetryTopicConfigurer bean retrieval.
* Remove unnecessary casts in doc.
  • Loading branch information
garyrussell committed Jul 7, 2022
1 parent a63359c commit 37910b6
Show file tree
Hide file tree
Showing 43 changed files with 602 additions and 2,400 deletions.
4 changes: 2 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/index.adoc
Expand Up @@ -47,12 +47,12 @@ The <<kafka,main chapter>> covers the core classes to develop a Kafka applicatio

include::kafka.adoc[]

include::retrytopic.adoc[]

include::streams.adoc[]

include::testing.adoc[]

include::retrytopic.adoc[]

[[tips-n-tricks]]
== Tips, Tricks and Examples

Expand Down
2 changes: 1 addition & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Expand Up @@ -5173,7 +5173,7 @@ This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringB
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <<retrying-batch-eh>>.

IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive).
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive; if <<retry-topic>> or a `ContainerPausingBackOffHandler` are being used, the pause may extend over multiple polls).
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
To enable this mode, set the property `seekAfterError` to `false`.

Expand Down
50 changes: 29 additions & 21 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Expand Up @@ -5,6 +5,10 @@ IMPORTANT: This is an experimental feature and the usual rule of no breaking API
Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions.
This is regarding the API only; the feature is considered to be complete, and robust.

Version 2.9 changed the mechanism to bootstrap infrastructure beans; see <<retry-config>> for the two mechanisms that are now required to bootstrap the feature.

After these changes, we are intending to remove the experimental designation, probably in version 3.0.

Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.

Expand Down Expand Up @@ -33,28 +37,23 @@ If one message's processing takes longer than the next message's back off period
Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution.
The precision can also be affected if the retry topic's consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments.

That being said, for consumers handling a single partition the message's processing should happen under 100ms after it's exact due time for most situations.
That being said, for consumers handling a single partition the message's processing should occur approximately at its exact due time for most situations.

IMPORTANT: It is guaranteed that a message will never be processed before its due time.

===== Tuning the Delay Precision

The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
These values will only be set if the property has its default values - if you change either value yourself your change will not be overridden.
This way you can tune the precision and performance for the retry topics if you need to.

NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.

[[retry-config]]
==== Configuration

Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
Starting with version 2.9, for default configuration, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.

Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`.

===== Using the `@RetryableTopic` annotation

To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
Expand Down Expand Up @@ -161,9 +160,9 @@ It's best to use a single `RetryTopicConfiguration` bean for configuration of su
[[retry-topic-global-settings]]
===== Configuring Global Settings and Features

Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the `RetryTopicConfiguration` beans approach - only infrastructure components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a (single) `@Configuration` class, and the proper methods overridden.
An example follows:

====
Expand All @@ -185,6 +184,15 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
----
====
Expand Down Expand Up @@ -629,7 +637,7 @@ As an example the following implementation, in addition to the standard suffix,
----
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
Expand Down Expand Up @@ -728,7 +736,7 @@ In the latter the consumer ends the execution without forwarding the message.
----
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
Expand Down Expand Up @@ -777,7 +785,7 @@ In this case after retrials are exhausted the processing simply ends.
----
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
Expand Down Expand Up @@ -872,8 +880,8 @@ For example, to change the logging level to WARN you might add:
----
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(commonErrorHandler ->
((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN))
customizersConfigurer.customizeErrorHandler(defaultErrorHandler ->
defaultErrorHandler.setLogLevel(KafkaException.Level.WARN))
}
----
====
Expand Up @@ -58,8 +58,6 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
Expand Down Expand Up @@ -188,6 +186,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>

private AnnotationEnhancer enhancer;

private RetryTopicConfigurer retryTopicConfigurer;

@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
Expand Down Expand Up @@ -510,27 +510,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
}

private RetryTopicConfigurer getRetryTopicConfigurer() {
bootstrapRetryTopicIfNecessary();
return this.beanFactory.containsBean("internalRetryTopicConfigurer")
? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}

@SuppressWarnings("deprecation")
private void bootstrapRetryTopicIfNecessary() {
if (!(this.beanFactory instanceof BeanDefinitionRegistry)) {
throw new IllegalStateException("BeanFactory must be an instance of "
+ BeanDefinitionRegistry.class.getSimpleName()
+ " to bootstrap the RetryTopic functionality. Provided beanFactory: "
+ this.beanFactory.getClass().getSimpleName());
}
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
if (!registry.containsBeanDefinition("internalRetryTopicBootstrapper")) {
registry.registerBeanDefinition("internalRetryTopicBootstrapper",
new RootBeanDefinition(org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class));
this.beanFactory.getBean("internalRetryTopicBootstrapper",
org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class).bootstrapRetryTopic();
if (this.retryTopicConfigurer == null) {
try {
this.retryTopicConfigurer = this.beanFactory
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}
catch (NoSuchBeanDefinitionException ex) {
this.logger.error("A 'RetryTopicConfigurer' with name "
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
throw ex;
}
}
return this.retryTopicConfigurer;
}

private Method checkProxy(Method methodArg, Object bean) {
Expand Down
Expand Up @@ -76,8 +76,6 @@ public class RetryableTopicAnnotationProcessor {

private final BeanExpressionContext expressionContext;

private static final String DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME = "kafkaTemplate";

/**
* Construct an instance using the provided parameters and default resolver,
* expression context.
Expand Down Expand Up @@ -214,26 +212,15 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
}
}
try {
return this.beanFactory.getBean(
org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
KafkaOperations.class);
}
catch (NoSuchBeanDefinitionException ex) {
try {
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
KafkaOperations.class);
}
catch (NoSuchBeanDefinitionException ex2) {
try {
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
}
catch (NoSuchBeanDefinitionException exc) {
exc.addSuppressed(ex);
exc.addSuppressed(ex2);
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
exc);
}
}
catch (NoSuchBeanDefinitionException ex2) {
KafkaOperations<?, ?> kafkaOps = this.beanFactory.getBeanProvider(KafkaOperations.class).getIfUnique();
Assert.state(kafkaOps != null, () -> "A single KafkaTemplate bean could not be found in the context; "
+ " a single instance must exist, or one specifically named "
+ RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME);
return kafkaOps;
}
}

Expand Down
Expand Up @@ -75,6 +75,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR

private final Map<String, MessageListenerContainer> unregisteredContainers = new ConcurrentHashMap<>();

private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();

private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
Expand Down Expand Up @@ -109,6 +111,17 @@ public MessageListenerContainer getListenerContainer(String id) {
return this.listenerContainers.get(id);
}

@Override
@Nullable
public MessageListenerContainer getUnregisteredListenerContainer(String id) {
MessageListenerContainer container = this.unregisteredContainers.get(id);
if (container == null) {
refreshContextContainers();
return this.unregisteredContainers.get(id);
}
return null;
}

/**
* By default, containers registered for endpoints after the context is refreshed
* are immediately started, regardless of their autoStartup property, to comply with
Expand Down Expand Up @@ -156,10 +169,17 @@ public Collection<MessageListenerContainer> getListenerContainers() {
public Collection<MessageListenerContainer> getAllListenerContainers() {
List<MessageListenerContainer> containers = new ArrayList<>();
containers.addAll(getListenerContainers());
containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values());
refreshContextContainers();
containers.addAll(this.unregisteredContainers.values());
return containers;
}

private void refreshContextContainers() {
this.unregisteredContainers.clear();
this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values()
.forEach(container -> this.unregisteredContainers.put(container.getListenerId(), container));
}

/**
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
Expand Down
Expand Up @@ -25,6 +25,7 @@
* Base class for {@link KafkaBackOffManagerFactory} implementations.
*
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.7
* @see KafkaConsumerBackoffManager
*/
Expand All @@ -35,24 +36,23 @@ public abstract class AbstractKafkaBackOffManagerFactory

private ListenerContainerRegistry listenerContainerRegistry;

/**
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
* the {@link ApplicationContext}.
*/
public AbstractKafkaBackOffManagerFactory() {
this.listenerContainerRegistry = null;
}

/**
* Creates an instance with the provided {@link ListenerContainerRegistry},
* which will be used to fetch the {@link MessageListenerContainer} to back off.
* @param listenerContainerRegistry the listenerContainerRegistry to use.
*/
public AbstractKafkaBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
this.listenerContainerRegistry = listenerContainerRegistry;
}

/**
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
* the {@link ApplicationContext}.
*/
public AbstractKafkaBackOffManagerFactory() {
this.listenerContainerRegistry = null;
}

/**
* Sets the {@link ListenerContainerRegistry}, that will be used to fetch the
* {@link MessageListenerContainer} to back off.
Expand Down Expand Up @@ -90,4 +90,5 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

}
Expand Up @@ -16,15 +16,18 @@

package org.springframework.kafka.listener;

import org.apache.kafka.common.TopicPartition;

import org.springframework.lang.Nullable;

/**
* Handler for the provided back off time, listener container and exception.
* Also supports back off for individual partitions.
*
* @author Jan Marincek
* @since 2.9
* @author Jan Marincek
* @author Gary Russell
* @since 2.9
*/
@FunctionalInterface
public interface BackOffHandler {

/**
Expand All @@ -33,6 +36,20 @@ public interface BackOffHandler {
* @param exception the exception.
* @param nextBackOff the next back off.
*/
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
throw new UnsupportedOperationException();
}

/**
* Perform the next back off for a partition.
* @param container the container.
* @param partition the partition.
* @param nextBackOff the next back off.
*/
default void onNextBackOff(@Nullable MessageListenerContainer container, TopicPartition partition,
long nextBackOff) {

throw new UnsupportedOperationException();
}

}

0 comments on commit 37910b6

Please sign in to comment.