From e1160fa75a599387134c052f0ebd96359a8fd2b6 Mon Sep 17 00:00:00 2001 From: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com> Date: Thu, 28 Apr 2022 13:18:10 -0300 Subject: [PATCH] GH-2172: Expose Retry Topic Chain at Runtime (#2245) * GH-2172: Expose Retry Topic Chain at Runtime Resolves https://github.com/spring-projects/spring-kafka/issues/2172 * Add methods to `DestinationTopicContainer` interface * Reduce memory footprint of `DefaultDestinationTopicResolver` * Add documentation * Add to integration test --- .../src/main/asciidoc/retrytopic.adoc | 12 +++++++ .../DefaultDestinationTopicResolver.java | 33 +++++++++++-------- .../retrytopic/DestinationTopicContainer.java | 29 ++++++++++++++-- .../DefaultDestinationTopicResolverTests.java | 20 +++++++++++ .../RetryTopicIntegrationTests.java | 7 +++- 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 871930aa41..e8ca7b3d58 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -848,6 +848,18 @@ protected Consumer configureRetryTopicConfigurer() { ---- ==== +[[access-topic-info-runtime]] +==== Accessing Topics' Information at Runtime + +Since 2.9, you can access information regarding the topic chain at runtime by injecting the provided `DestinationTopicContainer` bean. +This interface provides methods to look up the next topic in the chain or the DLT for a topic if configured, as well as useful properties such as the topic's name, delay and type. + +As a real-world use-case example, you can use such information so a console application can resend a record from the DLT to the first retry topic in the chain after the cause of the failed processing, e.g. bug / inconsistent state, has been resolved. + +IMPORTANT: The `DestinationTopic` provided by the `DestinationTopicContainer#getNextDestinationTopicFor()` method corresponds to the next topic registered in the chain for the input topic. +The actual topic the message will be forwarded to may differ due to different factors such as exception classification, number of attempts or single-topic fixed-delay strategies. +Use the `DestinationTopicResolver` interface if you need to weigh in these factors. + [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index bdaadbae02..d992a46b25 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -34,11 +34,12 @@ import org.springframework.kafka.listener.ExceptionClassifier; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.TimestampedException; +import org.springframework.lang.Nullable; /** * - * Default implementation of the DestinationTopicResolver interface. + * Default implementation of the {@link DestinationTopicResolver} interface. * The container is closed when a {@link ContextRefreshedEvent} is received * and no more destinations can be added after that. * @@ -58,8 +59,6 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier private final Map sourceDestinationsHolderMap; - private final Map destinationsTopicMap; - private final Clock clock; private ApplicationContext applicationContext; @@ -81,7 +80,6 @@ public DefaultDestinationTopicResolver(Clock clock, ApplicationContext applicati public DefaultDestinationTopicResolver(Clock clock) { this.clock = clock; this.sourceDestinationsHolderMap = new HashMap<>(); - this.destinationsTopicMap = new HashMap<>(); this.contextRefreshed = false; } @@ -103,7 +101,7 @@ public DestinationTopic resolveDestinationTopic(String topic, Integer attempt, E && isNotFatalException(e) && !isPastTimout(originalTimestamp, destinationTopicHolder) ? resolveRetryDestination(destinationTopicHolder) - : resolveDltOrNoOpsDestination(topic); + : getDltOrNoOpsDestination(topic); } private Boolean isNotFatalException(Exception e) { @@ -140,18 +138,28 @@ private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinat @Override public DestinationTopic getDestinationTopicByName(String topic) { - return Objects.requireNonNull(this.destinationsTopicMap.get(topic), - () -> "No topic found for " + topic); + return Objects.requireNonNull(this.sourceDestinationsHolderMap.get(topic), + () -> "No DestinationTopic found for " + topic).getSourceDestination(); + } + + @Nullable + @Override + public DestinationTopic getDltFor(String topicName) { + DestinationTopic destination = getDltOrNoOpsDestination(topicName); + return destination.isNoOpsTopic() + ? null + : destination; } - private DestinationTopic resolveDltOrNoOpsDestination(String topic) { - DestinationTopic destination = getDestinationFor(topic); + private DestinationTopic getDltOrNoOpsDestination(String topic) { + DestinationTopic destination = getNextDestinationTopicFor(topic); return destination.isDltTopic() || destination.isNoOpsTopic() ? destination - : resolveDltOrNoOpsDestination(destination.getDestinationName()); + : getDltOrNoOpsDestination(destination.getDestinationName()); } - private DestinationTopic getDestinationFor(String topic) { + @Override + public DestinationTopic getNextDestinationTopicFor(String topic) { return getDestinationHolderFor(topic).getNextDestination(); } @@ -179,9 +187,6 @@ public void addDestinationTopics(List destinationsToAdd) { + DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed."); } synchronized (this.sourceDestinationsHolderMap) { - this.destinationsTopicMap.putAll(destinationsToAdd - .stream() - .collect(Collectors.toMap(destination -> destination.getDestinationName(), destination -> destination))); this.sourceDestinationsHolderMap.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd)); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 18d59c08cb..23cbd3ba35 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.List; +import org.springframework.lang.Nullable; + /** * * Provides methods to store and retrieve {@link DestinationTopic} instances. @@ -34,10 +36,33 @@ public interface DestinationTopicContainer { void addDestinationTopics(List destinationTopics); /** - * Returns the DestinationTopic instance registered for that topic. + * Returns the {@link DestinationTopic} instance registered for that topic. * @param topicName the topic name of the DestinationTopic to be returned. * @return the DestinationTopic instance registered for that topic. */ DestinationTopic getDestinationTopicByName(String topicName); + /** + * Returns the {@link DestinationTopic} instance registered as the next + * destination topic in the chain for the given topic. + * Note that this might not correspond to the actual next topic a message will + * be forwarded to, since that depends on different factors. + * + * If you need to find out the exact next topic for a message use the + * {@link DestinationTopicResolver#resolveDestinationTopic(String, Integer, Exception, long)} + * method instead. + * @param topicName the topic name of the DestinationTopic to be returned. + * @return the next DestinationTopic in the chain registered for that topic. + */ + DestinationTopic getNextDestinationTopicFor(String topicName); + + /** + * Returns the {@link DestinationTopic} instance registered as + * DLT for the given topic, or null if none is found. + * @param topicName the topic name for which to look the DLT for + * @return The {@link DestinationTopic} instance corresponding to the DLT. + */ + @Nullable + DestinationTopic getDltFor(String topicName); + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index 31867c241d..766894b067 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -159,6 +159,26 @@ void shouldResolveDltDestinationForExpiredTimeout() { 1, new IllegalArgumentException(), timestampInThePastToForceTimeout)).isEqualTo(dltDestinationTopic2); } + @Test + void shouldGetDestinationTopic() { + assertThat(defaultDestinationTopicContainer + .getDestinationTopicByName(mainDestinationTopic.getDestinationName())).isEqualTo(mainDestinationTopic); + } + + @Test + void shouldGetNextDestinationTopic() { + assertThat(defaultDestinationTopicContainer + .getNextDestinationTopicFor(mainDestinationTopic.getDestinationName())) + .isEqualTo(firstRetryDestinationTopic); + } + + @Test + void shouldGetDlt() { + assertThat(defaultDestinationTopicContainer + .getDltFor(mainDestinationTopic.getDestinationName())) + .isEqualTo(dltDestinationTopic); + } + @Test void shouldThrowIfNoDestinationFound() { assertThatNullPointerException().isThrownBy(() -> defaultDestinationTopicContainer.resolveDestinationTopic("Non-existing-topic", 0, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index 338b79cf1f..57ce304031 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -107,10 +107,15 @@ public class RetryTopicIntegrationTests { @Autowired private CountDownLatchContainer latchContainer; + @Autowired + DestinationTopicContainer topicContainer; + @Test void shouldRetryFirstTopic() { logger.debug("Sending message to topic " + FIRST_TOPIC); kafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); + assertThat(topicContainer.getNextDestinationTopicFor(FIRST_TOPIC).getDestinationName()) + .isEqualTo("myRetryTopic1-retry"); assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue(); assertThat(awaitLatch(latchContainer.customErrorHandlerCountdownLatch)).isTrue(); @@ -173,7 +178,7 @@ private boolean awaitLatch(CountDownLatch latch) { static class FirstTopicListener { @Autowired - DestinationTopicResolver resolver; + DestinationTopicContainer topicContainer; @Autowired CountDownLatchContainer container;