-
Notifications
You must be signed in to change notification settings - Fork 38.5k
Propagate CoroutineContext
in reactive transaction
#27308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@ks-yim Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@ks-yim Thank you for signing the Contributor License Agreement! |
b7498d6
to
10d600a
Compare
10d600a
to
09c64ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a deeper look to the implementation, but looks like we have something to fix here indeed.
@@ -71,7 +75,9 @@ | |||
public static Publisher<?> invokeSuspendingFunction(Method method, Object target, Object... args) { | |||
KFunction<?> function = Objects.requireNonNull(ReflectJvmMapping.getKotlinFunction(method)); | |||
KClassifier classifier = function.getReturnType().getClassifier(); | |||
Mono<Object> mono = MonoKt.mono(Dispatchers.getUnconfined(), (scope, continuation) -> | |||
Continuation<?> cont = (Continuation<?>) args[args.length - 1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super confortable doing this kind of manual handling of the continuation, so I have asked guidance to Kotlin team to see if there is a better way, waiting there feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feedback from Kotlin team:
What puzzles me is why there is
Job
in the context.
When you have one, it means that the lifecycle of the result is just incorrect — it is controlled solely by mono, but not by the Job as the caller might’ve expected.
It’s hard to tell if it is exactly wrong without knowing why Job is there in the first place
Potentially related to spring-projects/spring-data-commons#2532 cc @mp911de.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sdeleuze Sorry for the late feedback and thanks for taking your time to review this.
The reason why it had a Job
in the context in my use case is because the async server framework that I am using(Armeria) resorts to GlobalScope#future
to bridge its CompletableFuture
based implementations to the coroutine world.
I understand the concerns of Kotlin team worrying that the coroutine's lifecycle is no longer be managed by the caller's Job
context, but I thought it's okay to do this for this specific use case since the lifecycle control of the returned Mono
goes back into the caller's Job
context anyways in the subsequent call for awaitSingleOrNull
in this line.
Maybe the issue is in Armeria
's implementation and there won't be a Job
to be stripped away with WebFlux, but my current guess is that it won't be any different with WebFlux. I will run a test and share the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed the same for WebFlux. I will provide your feedback to the Kotlin team, may be indeed ok for this specific use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qwwdfsad Could you please check @ks-yim previous comment, especially this quote below, to help us analyzing if removing Job.Key
from the CoroutineContext
is ok for this very specific use case where we intercept the suspending invocation to plug our Reactive transaction management and later invoke AwaitKt.awaitSingleOrNull
with the original continuation:
I understand the concerns of Kotlin team worrying that the coroutine's lifecycle is no longer be managed by the caller's Job context, but I thought it's okay to do this for this specific use case since the lifecycle control of the returned Mono goes back into the caller's Job context anyways in the subsequent call for awaitSingleOrNull in this line.
FWIW our tests are green with this implementation proposed in my refined commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Another guy from the coroutines project here) You are doing very fancy things here! It did take me a while to understand them. If the CompletableFuture
gets cancelled, it will cancel awaitSingleOrNull
; if awaitSingleOrNull
gets cancelled, it will cancel the mono
; if mono
completes with an error or a value, it will return that via awaitSingleOrNull
. So, in effect, you're getting the same job control with extra steps.
The only issue I see is coroutine code that explicitly does something with the Job
, like currentCoroutineContext()[Job]!!.invokeOnCompletion { println("ok") }
. If you intercept such code in a mono
, then, naturally, the Job
used will be the one created by mono
, not the Job
from future
. I don't see a way around this, though.
.../src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt
Show resolved
Hide resolved
Moving this issue back to waiting for triage while we are waiting more information on spring-projects/spring-data-commons#2532 side. |
Are there any updates on this issue? It's a bit of a roadblock for us - we rely on the context being propagated so can't use transactions at the moment :( |
@jamesbassett Any chance you could provide a repro since we have no feedback on that on the Spring Data issue? |
@sdeleuze I have created a reproducible example here https://github.com/JoeMaher/tx-coroutine-context-loss-example, please let me know if you have any queries/concerns |
Great, thanks, I will have a look shortly. |
Hi @sdeleuze did you get a chance to look at @JoeMaher 's reproducer? 🤞 |
Sorry for the delay but I am currently focused on the latest bits of native support for Spring Framework 6 RC1. |
Hi @sdeleuze just bumping this (now that Spring 6 is released!) |
Thanks for your patience all. I re-worked the PR after fixing the nullability issue via #29919 with this commit, I now need to have the confirmation from the Kotlin team this is ok to remove If validated, we may backport the fix to |
@mp911de @poutsma Could you please have a look to this commit and this comment from Kotlin team to check if you see something against merging this on The limitation on |
I did look, and can follow along with the discussion, but this is too far out of my area of expertise to give any insightful comment. If the PR resolves #27307, and does not break any existing code, than it seems good to me. |
The fix and tests for both functional and annotation variants are now merged in I evaluated if it is possible to merge it on |
That's fantastic news @sdeleuze thank you so much for progressing this 🥳 I'll give it a test drive when Boot 3.0.3 drops (I imagine that's later this month?) |
@jamesbassett there is the calendar with Spring Ecosystem releases https://calendar.spring.io/ |
Just dropping in to say thanks @sdeleuze I tested out Boot 3.0.3 today and can confirm that everything now works as expected (we're not losing context when using transactions) and |
Motivation:
CoroutineContext
#27307Modifications:
CoroutineContext
when converting coroutines intoMono
.CoroutinesUtils#invokeSuspendingFunction
TransactionalOperatorExtensions
Flow<T>.transactional(...)
TransactionalOperator.executeAndAwait(...)
TransactionalOperator.executeAndAwait(...)
's type parameter upper bound fromAny
toAny?