Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise RetryTopicConfigurationSupport TaskExecutor Strategy #2239

Closed
tomazfernandes opened this issue Apr 20, 2022 · 30 comments
Closed

Revise RetryTopicConfigurationSupport TaskExecutor Strategy #2239

tomazfernandes opened this issue Apr 20, 2022 · 30 comments

Comments

@tomazfernandes
Copy link
Contributor

Currently we add a ThreadPoolTaskExecutor @Bean in RetryTopicConfigurationSupport. That might have undesirable side-effects such as making Spring Boot's auto configuration back off from providing a default TaskExecutor for users.

We should investigate this further and if necessary look for alternatives.

@tomazfernandes
Copy link
Contributor Author

I started looking into this, and TBH I think it's a slightly worse scenario than what I'd have thought.

Turns out when there is a TaskExecutor bean in the context, Boot's auto configured task executor does back off, as we supposed it would. That might already be bad enough, since auto configuration enables many task executor configurations that users may be relying on.

But what I think is the worst part is that there's logic in the WebMvcAutoConfiguration#237 class that tries to fetch the auto configured task executor by name - since it's not there when we provide our own, it falls back to a SimpleAsyncTaskExecutor for the RequestMappingHandlerAdapter, which should be quite less performant than the default ThreadPoolTaskExecutor since it doesn't reuse threads.

I think this is really unexpected and weird behavior, but at least in my test application with Boot 2.6.6 that's how it goes.

I'm not sure what'd be a good move here - the less impactful thing I can think of would be if we'd go back to letting the WakingTimingAdjuster manage the TaskExecutor lifecycle, and maybe even having it instantiate it lazily on the first timing adjustment request.

Any thoughts?

Thanks.

@artembilan
Copy link
Member

You know I think the SimpleAsyncTaskExecutor would be a good compromise for a default settings.
I see we use it in many-many place in all the Spring projects.
Using a SimpleAsyncTaskExecutor we won't need to worry about its lifecycle.
The end-user would have a choice for other TaskExecutor impls to inject otherwise.

@tomazfernandes
Copy link
Contributor Author

Oh, that sounds great really. TBH, I have no clue of how frequently this timing adjuster kicks in in real life scenarios - it depends on too many factors. But as an overall default dependency for the feature, a full fledged ThreadPoolTaskExecutor indeed seems like too much.

I'll do some testing to see if it has any impact on actual precision and submit a PR with this change.

Please let me know if you think of anything else.

Thanks!

@garyrussell
Copy link
Contributor

garyrussell commented May 2, 2022

Actually, a SATE is probably not good here; especially with long delays; we could end up with many threads sleeping.

On the other hand too small a pool could cause some short delay to be queued behind longer delays for other partitions.

Maybe in 3.0, we should look at changing the logic to use a TaskScheduler instead, rather than sleeping the executor thread?

@tomazfernandes
Copy link
Contributor Author

Hmm, I didn't really know this TaskScheduler abstraction, looks interesting! I'll take a better look into it, but it does seem like a more proper approach than the current sleeping solution.

For clarification, the amount of time sleeping isn't actually related to the delay size. Every time we receive a PartitionIdleEvent for a backed off partition, we check if we're within 2 x pollTimeOut away from the dueTimestamp. If we are, we calculate how off we are from arriving at the due time in increments of pollTimeouts (long adjustmentAmount = timeUntilDue % pollTimeout), and then we sleep for this difference.

Let's say pollTimeout is 5000, and we're 5500 ms away from the due time, we're going to sleep for 500 ms only. Of course, the larger the pollTimeout, the bigger the adjustment can get.

public long adjustTiming(Consumer<?, ?> consumerToAdjust, TopicPartition topicPartition,
long pollTimeout, long timeUntilDue) {
boolean isInAdjustmentWindow = timeUntilDue > pollTimeout && timeUntilDue <=
pollTimeout * this.pollTimeoutsForAdjustmentWindow;
long adjustmentAmount = timeUntilDue % pollTimeout;
if (isInAdjustmentWindow && adjustmentAmount > this.timingAdjustmentThreshold.toMillis()) {
this.taskExecutor.execute(() ->
doApplyTimingAdjustment(consumerToAdjust, topicPartition, adjustmentAmount));
return adjustmentAmount;
}
return 0L;
}

A different solution might be having a single thread that runs through a list and wakes up the proper consumers at the proper time, but it seems the TaskScheduler would be simpler.

@tomazfernandes
Copy link
Contributor Author

Also, there's a minimal threshold of 100ms - adjustments below that won't be triggered. All these numbers are configurable, but I think most users should not really toy with these, that's why I didn't add it to the docs.

@tomazfernandes
Copy link
Contributor Author

About this, considering all that's been said, I think it's worth it using the SimpleAsyncTaskExecutor for 2.9, since with that we'd no longer keep Spring Boot's Executor from auto configuring correctly. Also, if I understand correctly, we'd not have the need to manage the Executor lifecycle.

Another option would be registering the WakingTimingAdjuster as a bean instead of the Task Executor and have it create and manage the Executor's lifecycle directly.

For 3.0 we can change the implementation to rely on the TaskScheduler instead.

WDYT?

Thanks

1 similar comment
@tomazfernandes
Copy link
Contributor Author

About this, considering all that's been said, I think it's worth it using the SimpleAsyncTaskExecutor for 2.9, since with that we'd no longer keep Spring Boot's Executor from auto configuring correctly. Also, if I understand correctly, we'd not have the need to manage the Executor lifecycle.

Another option would be registering the WakingTimingAdjuster as a bean instead of the Task Executor and have it create and manage the Executor's lifecycle directly.

For 3.0 we can change the implementation to rely on the TaskScheduler instead.

WDYT?

Thanks

@artembilan
Copy link
Member

Even if a TaskScheduler solution is better, we still have a problem with abusing auto-configuration. It is fully similar to what we have discussed above about a TaskExecutor.

I guess I need to investigate more in Spring Boot what and how they do with many TaskExecutor/TaskScheduler beans.

I recently changes Spring Integration to rely on the auto-configured TaskScheduler, but I cannot say yet what is going with the AbstractMessageBrokerConfiguration and its TaskExecutor/TaskScheduler beans...

@tomazfernandes
Copy link
Contributor Author

How about the idea of turning the WakingTimingAdjuster to a bean and having it manage the TaskExecutor directly? This way we leave auto configuration issues out, and can change the implementation later.

The issue I see with that is that currently we have an option of not having a TimingAdjuster - not sure if we can still have that with a @Bean declaration without using some @Conditional.

@artembilan
Copy link
Member

Well, the bean method can return null. Therefore a dependency on this bean can be marked with @Nullable.

@tomazfernandes
Copy link
Contributor Author

Nice, I wasn't sure if that was a proper solution, thanks. I think since we have this auto configuration problems and with some not-so-great consequences, it's worth it shielding the solution from that. We can always revise it later for 3.0 if we need to.

If that sounds good, I can open a PR for us to look at, probably with the TimingAdjuster implementing InitializingBean and DisposableBean, and being registered as a @Bean in the RetryTopicConfigurationSupport class.

@artembilan
Copy link
Member

See the same WebSocketConfigurationSupport:

	@Bean
	@Nullable
	public TaskScheduler defaultSockJsTaskScheduler() {
		if (initHandlerRegistry().requiresTaskScheduler()) {
			ThreadPoolTaskScheduler threadPoolScheduler = new ThreadPoolTaskScheduler();
			threadPoolScheduler.setThreadNamePrefix("SockJS-");
			threadPoolScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
			threadPoolScheduler.setRemoveOnCancelPolicy(true);
			this.scheduler = threadPoolScheduler;
		}
		return this.scheduler;
	}

Where this.scheduler is null by default.

However I see in the latest version they did something like this:

	@Bean
	DefaultSockJsSchedulerContainer defaultSockJsSchedulerContainer() {
		return (initHandlerRegistry().requiresTaskScheduler() ?
				new DefaultSockJsSchedulerContainer(initDefaultSockJsScheduler()) :
				new DefaultSockJsSchedulerContainer(null));
	}

The issue is exactly the same what we are talking here about: spring-projects/spring-framework#27903

@tomazfernandes
Copy link
Contributor Author

Nice, thanks. They seem to have created a wrapper, perhaps to escape that problem. We already have a BackOffManagerConfigurer option to disable TimingAdjustment, so it shouldn't be too hard.

I didn't see anyone in that thread address the issue that WebMvcAutoConfiguration seems to fall back to a SATE if any TaskExecutor is declared as a bean with a different name though. Maybe it's something worth commenting?

@artembilan
Copy link
Member

It comes with the:

@AutoConfiguration(after = { DispatcherServletAutoConfiguration.class, TaskExecutionAutoConfiguration.class,
		ValidationAutoConfiguration.class })

So, the WebMvcAutoConfiguration really relies on the auto-configured TaskExecutor.
I guess nothing to comment. 😄

@tomazfernandes
Copy link
Contributor Author

Sorry, I'm not sure I follow... What I mean is, in TaskExecutionAutoConfiguration.java the TaskExecutor bean is ConditionalOnMissingBean - so it backs off when any TaskExecutor is provided.

Then in WebMvcAutoConfiguration.java, the logic tries to fetch the TaskExecutor by name - so if any TaskExecutor bean with a different name has been provided it falls back to a SATE.

Maybe that's documented somewhere and not a real issue - as a user I was surprised by this side effect though, specially since if I understand correctly that's the TaskExecutor that will be used to handle incoming requests.

Now that I think about it, that would also mean if users inject the provided TaskExecutor and use it for business logic, they'll be using resources from request handling.

I guess I'm probably missing something 😄

@tomazfernandes
Copy link
Contributor Author

I guess this is it:

AsyncSupportConfigurer#setTaskExecutor

The provided task executor is used to:
Handle Callable controller method return values.
Perform blocking writes when streaming to the response through a reactive (e.g. Reactor, RxJava) controller method return value.
By default only a SimpleAsyncTaskExecutor is used. However when using the above two use cases, it's recommended to configure an executor backed by a thread pool such as ThreadPoolTaskExecutor.
Params:
taskExecutor – the task executor instance to use by default

So it looks like a niche use case for this executor. Although the javadocs is perhaps a bit misleading since by default the auto configured ThreadPoolTaskExecutor is used. But nothing to write home about I guess.

@artembilan
Copy link
Member

The point is that either way the WebMvc does not provide any extra TaskExecutor bean: or it uses that TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME or falls back to the SATE.
I guess the idea is to use a single managed TaskExecutor throughout the whole Spring Boot application.
That's why I don't want to expose TaskExecutor beans from our side as well.

@tomazfernandes
Copy link
Contributor Author

Sure, makes sense. Also, users relying on injecting the auto configured bean would get a different TaskExecutor instead. I'll implement something along the lines you mentioned before, with the nullable TimingAdjuster bean instead of the TaskExecutor.

@garyrussell
Copy link
Contributor

I think I have figured out how we can resolve this using the new BackOffHandler concept.

Expect a PR in the next day or so.

@tomazfernandes
Copy link
Contributor Author

Sounds great @garyrussell, thanks! Looking forward to it.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 28, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 29, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
@garyrussell
Copy link
Contributor

Thanks; good to know the integration tests still work. I will probably just remove the unit tests for the deprecated components.

not all bad I guess.

This is in no way a criticism of your implementation or the necessary addition of the timing adjustment; just evolution.

remove the pollTimeout and partitionIdleInterval ...

Yes; I will leave the events though; someone might still want them.

Given that the doc still says

usual rule of no breaking API changes does not apply to this feature until the experimental designation is removed.

I wonder if we can just go ahead and remove everything instead of deprecating it, even in 2.9? @artembilan WDYT?

The wrapper should probably be moved to a top level class; I added it so we don't interfere with Boot's auto configured scheduler if Spring Integration and/or @EnableScheduling are being used, and the user wants to use a specific scheduler for this.

I don't think we should auto configure one; the way I've structured it, is we will use the wrapper if present or a single scheduler bean (if present) or Boot's auto configured scheduler, if present, and there are more than one.

I think we just need to document that fact.

Thanks for taking the time to review; I will start the clean up later today, but probably won't submit a PR until next week (and we have a holiday Monday). I am planning on releasing 2.9.0-RC1 next week; with GA a couple of weeks later.

Let me know if there is anything else we should consider for the RC.

@artembilan
Copy link
Member

I agree that we can go ahead with removal of the redundant API even in 2.9. Should not be a big breaking change since we have introduced some new stuff exactly in that 2.9.

@garyrussell garyrussell reopened this Jun 29, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 29, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
@garyrussell
Copy link
Contributor

I added the code purging commit to my branch https://github.com/garyrussell/spring-kafka/commits/GH-2239

Tests are all passing.

Let me know if you think I have been too aggressive, or if I have missed anything.

I am off work until Tuesday.

Docs and PR to follow next week.

I left decorateFactoryWithoutSettingContainerProperties for now.

@tomazfernandes
Copy link
Contributor Author

This looks good - but just to make sure we're on the same page regarding breaking changes, with this, not only users will need to add the @EnableKafkaRetryTopic annotation to make it work, but also any feature configuration they may have done by injecting infrastructure beans following the documentation will stop working as well. This goes for example for blocking retries, fatal classification, etc.

Of course, if that's not a concern to you, it's not to me either, and we do provide the same features with different configurations so maybe that could be considered an API change.

One thing I didn't really understand is this:

Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");

Also, as long as we're removing support for legacy bootstrapping, we can also remove support for legacy bean names in KLABPP, and leave only the constant:

return this.beanFactory.containsBean("internalRetryTopicConfigurer")
 				? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
 				: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);

The ListenerContainerFactoryResolver has a similar situation with legacy bean names as strings.

@garyrussell
Copy link
Contributor

I would normally not make such breaking changes in a minor release, but we were up front in the documentation that, while the feature is stable and supported, the APIs were subject to change. So I believe we are justified.

To be honest, I am glad that I added that to the docs; when you first submitted the original PR, the amount of new code was so overwhelming that a normal code review could never fully cover all the bases, so I suspected that we might need some breaking changes as usage increased.

If you feel it's too much, we can defer the second commit until 3.0. I am ok with that too; I would just need to pull some of the polishing into 2.9.

Please discuss with @artembilan while I am away.

The reason I added the "only one allowed" check is because, when I was migrating the tests, I added extends RetryTopicConfigurationSupport here...

https://github.com/garyrussell/spring-kafka/blob/9e82f6a386861404b7c60c48a172006d0077b2e8/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java#L318

...having not noticed there was already one here...

https://github.com/garyrussell/spring-kafka/blob/9e82f6a386861404b7c60c48a172006d0077b2e8/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java#L392

With that in place, the blocking retry tests all failed because the overridden method to set up the exceptions was not called.

https://github.com/garyrussell/spring-kafka/blob/9e82f6a386861404b7c60c48a172006d0077b2e8/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java#L394-L399

Perhaps I used a sledge hammer to crack a tiny nut; if you can think why that might have happened (and we can detect and correct it somehow), I would be happy to remove the check.

That said, Boot no longer allows bean overrides by default; having 2 or more of them in a Boot app won't work because of the bean declarations in the super class (without allowing bean overrides in Boot config). With this technique, we will fail fast with a meaningful message.

@garyrussell
Copy link
Contributor

Another reason for the change being ok in 2.9 is nobody is forced to use it; it's opt-in, Boot won't pull it in and the 3.2.0 clients work fine with 2.8 if there's a compelling reason for using those.

@tomazfernandes
Copy link
Contributor Author

I would normally not make such breaking changes in a minor release, but we were up front in the documentation that, while the feature is stable and supported, the APIs were subject to change. So I believe we are justified.

Sure, no worries, I just wanted to make sure you're aware that existing configurations would break too.

To be honest, I am glad that I added that to the docs; when you first submitted the original PR, the amount of new code was so overwhelming that a normal code review could never fully cover all the bases, so I suspected that we might need some breaking changes as usage increased.

Sure, I realize that, and do appreciate a lot you merging the code in such conditions. This opened a whole universe I didn't know existed and I'm really grateful for that.

If you feel it's too much, we can defer the second commit until 3.0. I am ok with that too; I would just need to pull some of the polishing into 2.9.

Not necessary for me - let's break some walls!

Please discuss with @artembilan while I am away.

Looks like we're all on the same page, but if anything comes up I'll bring it up, and I'll be available if anything is needed.

With that in place, the blocking retry tests all failed because the overridden method to set up the exceptions was not called.
Perhaps I used a sledge hammer to crack a tiny nut; if you can think why that might have happened (and we can detect and correct it somehow), I would be happy to remove the check.

Looks like the configured beans were overridden by new ones, right?

That said, Boot no longer allows bean overrides by default; having 2 or more of them in a Boot app won't work because of the bean declarations in the super class (without allowing bean overrides in Boot config). With this technique, we will fail fast with a meaningful message.

Sure, no worries, we can leave it that way, I was just curious.

Thanks a lot for looking into this! And let me know if there's anything else I can help with.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 5, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 5, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 6, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 6, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 6, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 6, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 7, 2022
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 7, 2022
artembilan pushed a commit that referenced this issue Jul 7, 2022
Resolves #2239

* GH-2239: Replace PartitionPausingBackOffManager
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.
Revert change to deprecated PartitionPausingBackoffManager.
Log resume.
* Remove legacy code.
Also fix unrelated race in EKIT.
Only allow one `RetryTemplateConfigurationSupport` bean.
* Fix static var.
* Docs.
* More docs.
* Remove more dead/deprecated code.
* Address PR Comments.
* Fix RetryTopicConfigurer bean retrieval.
* Remove unnecessary casts in doc.
# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java
#	spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicInternalBeanNames.java
#	spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java
artembilan added a commit to artembilan/spring-kafka that referenced this issue Jul 7, 2022
Related to spring-projects#2239

**Cherry-pick to `2.9.x`**
garyrussell pushed a commit that referenced this issue Jul 7, 2022
Related to #2239

**Cherry-pick to `2.9.x`**
garyrussell pushed a commit that referenced this issue Jul 7, 2022
Related to #2239

**Cherry-pick to `2.9.x`**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 11, 2022
See spring-projects#2239

The previous commit removed the implicit bootstrap in favor of enforcing the
user to use the `@EnableKafkaRretryTopic` or explicitly extend
`RetryTopicConfigurationSupport`.

Unfortunately, this breaks Spring Boot because it can auto configure a
`RetryTopicConfiguration` bean, which means the infrastructure beans are required.

Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration`
bean is found in the application context.

**cherry-pick to main**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 11, 2022
See spring-projects#2239

The previous commit removed the implicit bootstrap in favor of enforcing the
user to use the `@EnableKafkaRretryTopic` or explicitly extend
`RetryTopicConfigurationSupport`.

Unfortunately, this breaks Spring Boot because it can auto configure a
`RetryTopicConfiguration` bean, which means the infrastructure beans are required.

Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration`
bean is found in the application context.

Tested with a Boot app.

**cherry-pick to main**
artembilan pushed a commit that referenced this issue Jul 11, 2022
See #2239

The previous commit removed the implicit bootstrap in favor of enforcing the
user to use the `@EnableKafkaRretryTopic` or explicitly extend
`RetryTopicConfigurationSupport`.

Unfortunately, this breaks Spring Boot because it can auto configure a
`RetryTopicConfiguration` bean, which means the infrastructure beans are required.

Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration`
bean is found in the application context.

Tested with a Boot app.

**cherry-pick to main**
artembilan pushed a commit that referenced this issue Jul 11, 2022
See #2239

The previous commit removed the implicit bootstrap in favor of enforcing the
user to use the `@EnableKafkaRretryTopic` or explicitly extend
`RetryTopicConfigurationSupport`.

Unfortunately, this breaks Spring Boot because it can auto configure a
`RetryTopicConfiguration` bean, which means the infrastructure beans are required.

Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration`
bean is found in the application context.

Tested with a Boot app.

**cherry-pick to main**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants