New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-2226: Add RetryTopicConfigurationSupport #2227
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a glance this is great!
👍
spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableRetryTopic.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/kafka/config/KafkaBackOffManagerConfigurationSupport.java
Outdated
Show resolved
Hide resolved
...main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicConfigurationSupport.java
Show resolved
Hide resolved
...a/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java
Outdated
Show resolved
Hide resolved
...a/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableRetryTopic.java
Outdated
Show resolved
Hide resolved
I've made the suggested changes - please let me know if there's anything else. If code-wise this looks ok, I can start working on tests and reviewing the javadocs. If we want to make I'm working under the assumption that we'll be releasing this in 2.9 - if that's not the case please let me know so I can update the Thanks. |
spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java
Show resolved
Hide resolved
I was not too happy with the solution for the @artembilan, if you have the time to take a look at these changes, I'd appreciate your input. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further review.
Thanks
spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicComponentFactory.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicConfigurationSupport.java
Outdated
Show resolved
Hide resolved
I am inclined to push out the 2.9 M1 for up to a week, since we are still in review mode here. WDYT? |
TBH, I wasn't aware we had a release date for M1 😄 Yeah, I think the 25th should be doable. Anything happens, I'll let you know and we can either push the date a bit further or leave this for M2, if that works for you. |
I scheduled it for the 18th because it's easier for me to do all the releases the same day. Moving to the 25th (which doesn't have to be firm). AFAICT, the only one we need for Monday is #2223 |
Sure, sounds good, thanks.
Yes, I'm working on that right now. I noticed since the Also, I couldn't really figure out an worthwhile way to adjust the integration tests so that they'd fail with the current logic - I think I'd need a way of setting the partition's offset to 1 before the container is created, and make sure that when the container starts it resets back to 0. If you have any pointers on that please let me know, probably on that PR so we don't hijack this one. Thanks |
Resolves spring-projects#2226 Add RetryTopicConfigurationSupport Add @EnableRetryTopic Add KafkaBackOffManagerConfigurationSupport
* Change deprecated constants for strings * Remove deprecation suppressions * Remove ApplicationContext and BeanFactory from constructors * Change TaskExecutor shutdown logic to DisposableBean
Added unit tests, if you care to take a look. It gives a nice perspective on how configurations can be achieved. Also, happy to say that on aggregate (UT + IT), the new classes have 100% test coverage, and the Thanks a lot for the support and guidance from you both! Next up, docs and javadocs. |
As a note, we're already using this new |
...afka/src/main/java/org/springframework/kafka/listener/WakingKafkaConsumerTimingAdjuster.java
Outdated
Show resolved
Hide resolved
...g-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java
Outdated
Show resolved
Hide resolved
Create defaultFatalExceptionsList method in `ExceptionClassifier` Change RetryTopicConfigurationSupport logic to provide a list instead of a configurer for non-blocking retries
Add KafkaBackOffManagerConfigurer to RetryTopicConfigurationSupport
Considering what was discussed with @artembilan, I've changed the I also created a @garyrussell, I've also added a Thanks! |
I've added documentation, changed the code examples to the new approach and reviewed the javadocs. Of course, that's a whole lot of text 😄 - there are probably some mistakes around we can fix as we go through the milestone cycle. But I think overall we can see in the code example diffs the improvements achieved with this approach. As far as I'm concerned this is good to go, so I'm removing the I think we look good for releasing this next week, so thank you very much for the support and guidance so far. As usual, let me know if there's anything else to be changed, or any further concerns. Thanks! |
spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicConfigurationSupport.java
Show resolved
Hide resolved
I appreciate that @garyrussell, thanks. Merit goes to @artembilan for insisting on this approach, and then seeing it through - turned out a lot better than I expected! Also, thanks for your patience, some of this work is sometimes still a bit overwhelming for me. I appreciate the guidance and support from you both. I've addressed the review comments - please let me know if there's anything else. Thanks! |
@@ -87,5 +82,6 @@ | |||
@Target(ElementType.TYPE) | |||
@Documented | |||
@Import(RetryTopicConfigurationSupport.class) | |||
public @interface EnableRetryTopic { | |||
@EnableKafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this has to be there. Or at least it has to be documented properly:
"For convenience the @EnableKafkaRetryTopic
is marked also with the @EnableKafka
since former cannot work without secondary"
(Or some other proper English language 😄 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Now I'm thinking that since Boot's auto configuration already provides an @EnableKafka
by default, maybe we'd run into the same issue of duplicated beans if users used this @EnableKafkaRetryTopic
annotation?
I think that, considering this is a temporary solution until we add this as default in 3.0, maybe it'd be simpler to stick with the @EnableRetryTopic
annotation, and then either move it to @EnableKafka
or simply kill it and import the Support
class directly for 3.0. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is no harm: the @EnableKafka
does a decent work to not duplicate beans. See its target KafkaBootstrapConfiguration
.
I'm not sure that retry topics should be a default feature.
Therefore keep that @EnableKafkaRetryTopic
is the way to go from now on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if we're introducing a new annotation then, and probably 99% of the users will be using it in a Spring Boot
application anyway, I'd stick with the simpler @EnableRetryTopic
annotation that should be enough for them, and just add a note in the documentation that if users are not in a Spring Boot
app they should also add @EnableKafka
.
But no worries, I'll keep it as is and we can revise this down the road if needed.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a thought, of course we can look into this later, but what if for 3.0 we added this class to @EnableKafka
, but also added a custom @Condition
that would perform that annotation / bean lookup we had in the other PR, to only add this beans it if there's at least one RetryTopicConfiguration
or @RetryableTopic
present?
This way we'd keep the automatic feature enabling users currently have, while not encumbering other users with this if they don't use it.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's not how other @Enable...
features work. So, my vote for "No": if you are going to use @RetryableTopic
, then add respective @Enable...
.
The condition could be considered in Spring Boot which indeed has some opinion.
The framework by itself has to be as straightforward as possible.
In the end it is just a library with minimal opinion 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. As you said in another post, eventually it'll be a good thing to better integrate this with Spring Boot
's auto configuration in the future. But since this approach won't be available for Spring Boot
prior to 3.0, we don't really need to worry about it now.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still want to see some mentioning of this meta-annotation in the Javadocs of this EnableKafkaRetryTopic
class.
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); | ||
JavaUtils.INSTANCE.acceptIfNotNull(this.maxThreadPoolSize, taskExecutor::setMaxPoolSize); | ||
taskExecutor.initialize(); | ||
super.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> taskExecutor.shutdown()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still in doubts for all of this stuff around internal executor...
Can you, please, bring the bean back and let's revise it in a separate issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, no worries, I'll bring it back that and open the issue. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved it back - how grateful I am for git
in these moments 😄
Also opened the issue: #2239
Please let me know if there's anything else.
Thanks.
@artembilan, just a friendly reminder, if nothing has changed, this is supposed to be on Monday's Of course, if you're not available to look at this soon enough, or if there's anything else, no worries. The two points I'm not 100% confident about this are:
I'm ok with leaving both as is, if you're ok with that. Please let me know if there's anything to be changed. Thanks. |
It is holiday season here for us, but I definitely will look shortly again on Monday if Gary insists on that release. Nevertheless I agree with Gary that annotation name has to be |
Sure, sorry, I didn't know you were on holidays. Also, after some more consideration, I think I'd be ok with this being merged to 3.0 instead - if you think this is good enough - and we let go of 2.9. I'll much unfortunately need to change some work arrangements around here and I don't think I'll be able to keep up the pace of involvement with the project I've had for the past few months, so maybe it'd be better to have a longer cycle for this - at least as far as I'm concerned. Of course, that's @garyrussell's call. It's been a blast though, and thank you both so very much for everything! Of course, feel free to ping me if anything comes up that I can help with - I should be able to at least provide some input. Thanks again, and happy holidays! |
We can be flexible when we release M1 since nothing depends on it; Monday is just a placeholder. |
Sure. As I mentioned though, I'll really need to shift my focus elsewhere beginning next week - I was counting on friday's deadline for this. It's been more than I month since I've started looking into these configuration issues, so I hope you consider that enough effort on my part. If there's anything else to be done, it would be great if you could just add a TBH, I still think we could have come with some simple fix we could merge to As I said before, feel free to ping me if there's anything I can help with - I should be able to at least provide some input on retryable topic issues - and I will try to help out all around if and when possible. Thanks a lot for everything, you're both really great software engineers and this has been both an honor and a blast! |
@@ -48,6 +48,11 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an | |||
|
|||
==== Configuration | |||
|
|||
Since 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class instead of the regular `@EnableKafka` annotation, which it contains. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not good say like this: brings an impression that we must not use an @EnableKafka
anymore.
And you even didn't introduce what is @EnableKafkaRetryTopic
. The side effect with the @EnableKafka
could be explained in the end of this paragraph. Probably in the simple "NOTE: When @EnableKafkaRetryTopic
is used, the @EnableKafka
can be omitted: the former is meta-annotated with the second."
@@ -87,5 +82,6 @@ | |||
@Target(ElementType.TYPE) | |||
@Documented | |||
@Import(RetryTopicConfigurationSupport.class) | |||
public @interface EnableRetryTopic { | |||
@EnableKafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still want to see some mentioning of this meta-annotation in the Javadocs of this EnableKafkaRetryTopic
class.
* @return the task executor. | ||
*/ | ||
public TaskExecutor taskExecutor() { | ||
return new ThreadPoolTaskExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we provide a bean now?
What is the point of this factory method if end-user will have to just provide a respective bean to override a default one?
* is to extend directly from this class and override methods as necessary, remembering | ||
* to add {@link Configuration @Configuration} to the subclass and {@link Bean @Bean} | ||
* to overridden {@link Bean @Bean} methods. For more details see the javadoc of | ||
* {@link EnableKafkaRetryTopic @EnableRetryTopic}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some typo. Or missed renaming...
return configurer.timingAdjustmentEnabled | ||
? this.componentFactory.taskExecutor() | ||
: task -> { | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can return null
here instead if we just don't need such a bean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do that because it is a required dependency in the kafkaConsumerBackoffManager
bean.
|
||
private Consumer<CommonErrorHandler> errorHandlerCustomizer; | ||
private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer; | ||
private Consumer<DeadLetterPublishingRecoverer> deadLetterPublishingRecovererCustomizer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every member of the class has to be surrounded with blank lines (including the last one in the class).
Hi. Can anyone be so kind to explain to me again why we need the Thanks. |
Because this is more Apache Kafka feature and it has nothing to do with JMS or AMQP (or even Hazelcast) topics. So, such a generic name might lead to the confusion for someone who is new to Kafka, but would rather expect a feature for his/her non-Kafka topics. |
Sure, that sounds reasonable, thanks. |
Also, still counting on your kindness, if you could bear with me this one more time, my heart is still not at ease with our plan for (not) solving the configuration problems for 2.8.x. Again, I'm sure I'm missing something and would just like to know what. As far as I can see, in our current plan, we'll go throughout the year with users on the latest Then, if they don't just give up trying that, they'll look around on Stack Overflow and see either the workaround or our recommendation to manually upgrade to 2.9. So, overall, looks to me like a very frustrating path for our users, throughout the whole year and likely for much longer since most existing applications probably won't be able to upgrade to Spring 6.0 and Java 17 so easily to get a proper functional feature by default. If we had any plans to get back to boot's 2.x train, that would be less of a concern to me. What am I missing from this picture, please? Is being on the latest Boot version, doing exactly what's in the docs and getting random errors acceptable for such an important Spring library, for so long? Thanks. |
@tomazfernandes My apologies; I have not been closely involved with this PR; I thought it was generally to make it easier to customize the configuration of the feature. Which random, non-intelligible errors do you mean? |
Sure Gary, no worries. I think the proverbial case is as in this question: The 2.8.x user will do exactly what's in the documentation to configure, for example, the blocking exceptions. But note that this can happen on any feature configuration where the bean to be configured depends on other feature's beans. @Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}} Depending on the particular order things happen in the user's application, this might work, or it might throw a Then maybe the user finds that question on SO, and sees this workaround: @Configuration
public static class SO71705876Configuration {
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC());
lcfc.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER)
public KafkaConsumerBackoffManager backOffManager(ApplicationContext context) {
PartitionPausingBackOffManagerFactory managerFactory =
new PartitionPausingBackOffManagerFactory();
managerFactory.setApplicationContext(context);
return managerFactory.create();
}
@Bean(name = RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory dlprFactory(DestinationTopicResolver resolver) {
return new DeadLetterPublishingRecovererFactory(resolver);
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
return new DefaultDestinationTopicResolver(Clock.systemUTC(), context);
}
} I wouldn't be too confident about this either TBH. But then, let's say we release 2.9, so the user is asked to manually override a dependency version to use this support approach - which indeed solves the problem. I think many users wouldn't feel so confident about doing that manual override either. And by this moment, probably many of them will have given up already. Unless I'm missing something, that's the user experience we'll be providing for the entire year, and, as I've stated, for even longer. Note that the user didn't do anything wrong - they're on the latest and greatest Spring Boot version, using the proper spring boot-defined SK dependency version, and just trying to use something exactly as it's documented. Of course, as I mentioned, maybe I'm missing something, or exaggerating. I just really want to understand why this is not a problem we should give a better solution to. Thanks. |
To solve the lazy bean creation, we should be able to backport just the Am I missing something? |
You even suggested that here. |
I’m not sure I can visualize exactly what’s on your mind. From RT’s perspective this should work. My concerns would be:
|
The other options I see would be:
|
Right. That was one my biggest concerns for the original solution: end-user beans cannot contribute to the infrastructure if they also depend on some infrastructure. I believe we stated somewhere that retriable topics is an experimental feature. So, it is expected that something is not going to work. Therefore an I still against pushing this PR back to |
I agree; that's not up for debate; I was just looking to see if there was some middle ground for 2.8.x users; if not, so be it. |
Thanks a lot for your answers, I really appreciate that. I do think that, if something can be done to reduce user frustration on the Also, I think that this became a bigger issue mostly because I showed up out of the blue a few months ago, started pushing code, and by the time we realized we'd need a new minor version it was too late to get into Another important thing for us to figure out is whether this is a production-ready, robust and complete feature, as the experimental disclaimer states, and that deserves proper support as such, or if it's a fundamentally flawed experimental feature that the only worthy thing to do is to focus on rebuilding it for I think we're on a good track overall, we did some really nice work so far, and as we get to know each other better we should be able to reduce friction in complex issues such as this. I'll try to look into this PR sometime later this week, or early next. Or, if you prefer, you can add that polish commit to release As I've stated, I shouldn't be able to focus as much on this project as I had been lately. But I'll try to help out as much as I can. Either way, I'm deeply grateful for all we've been able to do so far, and please let me know if there's anything else I can help with. Thanks! |
Thanks @tomazfernandes I will take care of @artembilan 's latest comments in a polishing commit; I plan on releasing 2.9.0-M1 tomorrow, now. Thanks for all your hard work on this. |
Polishing commit: garyrussell@db68ab7 |
Merged as b2beec3 and cherry-picked to 2.9.x |
Resolves #2226
Also resolves #2174
This is a first draft of a solution with a
RetryTopicConfigurationSupport
, so we can evaluate if it's going in the right direction before doing a final review, adding docs, etc.Overall, I liked it better than I thought I would - the bean count has been reduced to 3 or4 beans, and it gave a lot of clarity on the feature's design. It should make it easier for everyone to understand how the feature works. So thanks a lot @artembilan for insisting on this.
To be honest, I still think this might be a bit overwhelming for the average end-user to configure simple features, but if that's a real problem we can look into providing an easier alternative for some features in the future. It's definitely much better than what we have now, and a solid foundation to build upon.
There are a few decisions I made that are not exactly
WebMvcSupport
-like - I'm ok with rolling any or all of them back if that's better. For example, I:RetryTopicComponentsFactory
to reduce the number of methods to be overridden in theSupport
class and keep it overall smaller;KafkaBackOffManagerConfigurationSupport
because theKafkaBackOffManager
is useful outside of Retry Topic's scope;@EnableRetryTopic
annotation - while we're using this as an optional add-on, I think it's easier to document adding the annotation than the@Import
alternative - afterwards we can move this annotation to@EnableKafka
, or maybe kill it in 3.0.0. It also imports theKafkaBackOffManagerConfigurationSupport
class;RetryTopicBeanNames
class and deprecated theRetryTopicInternalBeanNames
class - since we're using a lot less beans, there's no need for so many names. Also this enables setting the names to the framework standards. For now the old names should still work - I need to do a more thorough review to ensure that. If that seems to risky, I'm ok with keeping the old names until 3.0.ApplicationContextAware
to some beans instead of injecting anApplicationContext
through the constructor. But I think we can review this in the future.Also, after more consideration, although I don't see any real risk since we're not actually changing any legacy logic besides some optional bean names which we can rollback, I think maybe it's indeed better for us to hold this for 2.9 - time is flying by and we should release it soon enough. Of course, I'm also ok with releasing it as 2.8.x if that's better.
I used this new configuration support for the
RetryTopicIntegrationTests
andRetryTopicExceptionRoutingIntegrationTests
, so these are good places to take a look into how it's like. I also made sure the former works with legacy configuration.Let me know what you think about this - hopefully this will be a solid step in the right direction for the RT feature.
Thanks!