Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug related with context propagation (CoroutineServerInterceptor) #4894

Merged
merged 14 commits into from
Jun 14, 2023

Conversation

be-hase
Copy link
Member

@be-hase be-hase commented May 23, 2023

Motivation:

A great feature for Coroutines users has been introduced.
#4724

However, Armeria's RequestContext is not propagated when using CoroutineServerInterceptor.

Modifications:

  • Modify the CoroutineServerInterceptor so that the context is propagated correctly

Result:

override fun <I : Any, O : Any> asyncInterceptCall(
call: ServerCall<I, O>,
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
check(call is AbstractServerCall) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this assertion is unneeded.


companion object {
@Suppress("UNCHECKED_CAST")
internal val COROUTINE_CONTEXT_KEY: Context.Key<CoroutineContext> =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used reflection... There is no other way.

@ikhoon
Copy link
Contributor

ikhoon commented May 23, 2023

This problem seems to be caused by ServerCalls.serverCallHandler() not being executed in the scope of CoroutineContextServerInterceptor.withGrpcContext() by offloading with CoroutineServerInterceptor.

A workaround for the problem would be to execute ArmeriaCoroutineContextInterceptor as the last interceptor before ServerCalls. It is feasible if we add ArmeriaCoroutineContextInterceptor to the first element of interceptors().

if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
final ServerInterceptor coroutineContextInterceptor =
new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
interceptors().add(coroutineContextInterceptor);
}

What do you think of this approach instead of using reflection which would be a final solution if we have no other choices?

@ikhoon ikhoon added the defect label May 23, 2023
@ikhoon ikhoon added this to the 1.24.0 milestone May 23, 2023
@be-hase
Copy link
Member Author

be-hase commented May 24, 2023

If it only propagates the Armeria Context, your proposed fix is fine.

However, your suggestion does not propagate the Coroutine Context.
For example, the test case here does not pass.

my test code diff from this PR:
diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index f8d5bcc6d..4e340e506 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,6 +17,8 @@
 package com.linecorp.armeria.server.grpc.kotlin
 
 import com.linecorp.armeria.common.annotation.UnstableApi
+import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
 import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
 import io.grpc.Context
 import io.grpc.Metadata
@@ -26,6 +28,8 @@ import io.grpc.ServerInterceptor
 import io.grpc.kotlin.CoroutineContextServerInterceptor
 import io.grpc.kotlin.GrpcContextElement
 import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.future.future
 import java.util.concurrent.CompletableFuture
 import kotlin.coroutines.CoroutineContext
@@ -62,14 +66,24 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
         headers: Metadata,
         next: ServerCallHandler<I, O>
     ): CompletableFuture<ServerCall.Listener<I>> {
-        // COROUTINE_CONTEXT_KEY.get():
-        //   It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
-        //   (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
-        // GrpcContextElement.current():
-        //   In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
-        return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+        check(call is AbstractServerCall) {
+            throw IllegalArgumentException(
+                "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
+            )
+        }
+        val executor = call.blockingExecutor() ?: call.eventLoop()
+
+        return GlobalScope.future(executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx())) {
             suspendedInterceptCall(call, headers, next)
         }
+//        // COROUTINE_CONTEXT_KEY.get():
+//        //   It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
+//        //   (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
+//        // GrpcContextElement.current():
+//        //   In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
+//        return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+//            suspendedInterceptCall(call, headers, next)
+//        }
     }
 
     /**
diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
index 38b66d976..01943b8e2 100644
--- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
+++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
@@ -948,6 +948,9 @@ public final class GrpcServiceBuilder {
     private ImmutableList.Builder<ServerInterceptor> interceptors() {
         if (interceptors == null) {
             interceptors = ImmutableList.builder();
+            final ServerInterceptor coroutineContextInterceptor =
+                    new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+            interceptors().add(coroutineContextInterceptor);
         }
         return interceptors;
     }
@@ -961,11 +964,11 @@ public final class GrpcServiceBuilder {
      */
     public GrpcService build() {
         final HandlerRegistry handlerRegistry;
-        if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
-            final ServerInterceptor coroutineContextInterceptor =
-                    new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
-            interceptors().add(coroutineContextInterceptor);
-        }
+//        if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
+//            final ServerInterceptor coroutineContextInterceptor =
+//                    new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+//            interceptors().add(coroutineContextInterceptor);
+//        }
         if (!enableUnframedRequests && unframedGrpcErrorHandler != null) {
             throw new IllegalStateException(
                     "'unframedGrpcErrorHandler' can only be set if unframed requests are enabled");

We use reflection, but grpc-kotlin is rarely changed, and even if it is changed, I think that unit test is enough to cover it.

@ikhoon
Copy link
Contributor

ikhoon commented May 24, 2023

However, your suggestion does not propagate the Coroutine Context.

I see. I'm okay with reflection. But we need to consider two points in the current changes.

  • If an AsyncServerInterceptor that switches the current thread is used, COROUTINE_CONTEXT_KEY.get() depending on ThreadLocal would be not working.
  • The reflection could raise an exception although our CI passes. Gradle picks a higher version by default. If the higher version renames the field, the reflection is no longer valid.

I will leave some workarounds on the code review commnets.

// (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
// GrpcContextElement.current():
// In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the previous plain AsyncServerInterceptor changes the current thread, we need to inject ArmeriaRequestCoroutineContext(call.ctx() again. What do you think of reviving the old code and combine them with the new one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even if thread switched, the coroutine will do the context propagation, so there is no problem.
Actually this test is pass.
https://github.com/be-hase/armeria/blob/e4aad71345c39853945fd97b2ea1f85feaf2fb3e/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt#L277-L286

I think it is a good idea to inject ArmeriaRequestCoroutineContext(call.ctx()) just in case.
Hmmm, which would you prefer?

Copy link
Contributor

@ikhoon ikhoon May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind testing the following scenario?

private class MyAsyncInterceptor : AsyncServerInterceptor {
    override fun <I : Any, O : Any> asyncInterceptCall(
        call: ServerCall<I, O>,
        headers: Metadata,
        next: ServerCallHandler<I, O>
    ): CompletableFuture<ServerCall.Listener<I>> {
        return CompletableFuture.supplyAsync({
            next.startCall(call, headers)
        }, Executors.newSingleThreadExecutor())
    }
}

GrpcService.builder()
    .exceptionMapping(statusFunction)
    // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
    .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, MyAsyncInterceptor())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my reply is late. I was on vacation.

Oh, I see what you mean by what you intend.
Indeed, I was able to confirm that test falls down when there is such an AsyncInterceptor.
However, test will still fail in this case.

What do you think of reviving the old code and combining them with the new one?

my test code diff from this PR:
diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index f8d5bcc6d..d44a0fc73 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,6 +17,8 @@
 package com.linecorp.armeria.server.grpc.kotlin
 
 import com.linecorp.armeria.common.annotation.UnstableApi
+import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
 import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
 import io.grpc.Context
 import io.grpc.Metadata
@@ -26,6 +28,7 @@ import io.grpc.ServerInterceptor
 import io.grpc.kotlin.CoroutineContextServerInterceptor
 import io.grpc.kotlin.GrpcContextElement
 import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.future.future
 import java.util.concurrent.CompletableFuture
 import kotlin.coroutines.CoroutineContext
@@ -62,12 +65,19 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
         headers: Metadata,
         next: ServerCallHandler<I, O>
     ): CompletableFuture<ServerCall.Listener<I>> {
+        check(call is AbstractServerCall) {
+            throw IllegalArgumentException(
+                "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
+            )
+        }
+        val executor = call.blockingExecutor() ?: call.eventLoop()
+
         // COROUTINE_CONTEXT_KEY.get():
         //   It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
         //   (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
         // GrpcContextElement.current():
         //   In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
-        return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+        return CoroutineScope(executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) + COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
             suspendedInterceptCall(call, headers, next)
         }
     }
diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index a05514ae4..f62fd9282 100644
--- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -34,6 +34,7 @@ import com.linecorp.armeria.internal.testing.AnticipatedException
 import com.linecorp.armeria.server.ServerBuilder
 import com.linecorp.armeria.server.ServiceRequestContext
 import com.linecorp.armeria.server.auth.Authorizer
+import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
 import com.linecorp.armeria.server.grpc.GrpcService
 import com.linecorp.armeria.testing.junit5.server.ServerExtension
 import io.grpc.Context
@@ -232,7 +233,12 @@ internal class CoroutineServerInterceptorTest {
                     GrpcService.builder()
                         .exceptionMapping(statusFunction)
                         // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
-                        .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor)
+                        .intercept(
+                            threadLocalInterceptor,
+                            authInterceptor,
+                            coroutineNameInterceptor,
+                            MyAsyncInterceptor()
+                        )
                         .addService(TestService())
                         .build()
                 )
@@ -242,7 +248,12 @@ internal class CoroutineServerInterceptorTest {
                         .addService(TestService())
                         .exceptionMapping(statusFunction)
                         // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
-                        .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor)
+                        .intercept(
+                            threadLocalInterceptor,
+                            authInterceptor,
+                            coroutineNameInterceptor,
+                            MyAsyncInterceptor()
+                        )
                         .useBlockingTaskExecutor(true)
                         .build()
                 )
@@ -322,6 +333,22 @@ internal class CoroutineServerInterceptorTest {
             }
         }
 
+        private class MyAsyncInterceptor : AsyncServerInterceptor {
+            override fun <I : Any, O : Any> asyncInterceptCall(
+                call: ServerCall<I, O>,
+                headers: Metadata,
+                next: ServerCallHandler<I, O>
+            ): CompletableFuture<ServerCall.Listener<I>> {
+                return CompletableFuture.supplyAsync({
+                    next.startCall(call, headers)
+                }, EXECUTOR)
+            }
+
+            companion object {
+                private val EXECUTOR = Executors.newSingleThreadExecutor()
+            }
+        }
+
         private class TestService : TestServiceGrpcKt.TestServiceCoroutineImplBase() {
             override suspend fun unaryCall(request: SimpleRequest): SimpleResponse {
                 assertContextPropagation()

As I wrote in the issue, it is necessary to propagate the grpc context correctly, so, for example, the following modification will cause the test to PASS.

private class MyAsyncInterceptor : AsyncServerInterceptor {
    override fun <I : Any, O : Any> asyncInterceptCall(
        call: ServerCall<I, O>,
        headers: Metadata,
        next: ServerCallHandler<I, O>
    ): CompletableFuture<ServerCall.Listener<I>> {
        val ctx = Context.current()
        return CompletableFuture.supplyAsync({
            val prev = ctx.attach()
            try {
                next.startCall(call, headers)
            } finally {
                ctx.detach(prev)
            }
        }, EXECUTOR)
    }

    companion object {
        private val EXECUTOR = Executors.newSingleThreadExecutor()
    }
}

By the way, this is NG.

private class MyAsyncInterceptor : AsyncServerInterceptor {
    override fun <I : Any, O : Any> asyncInterceptCall(
        call: ServerCall<I, O>,
        headers: Metadata,
        next: ServerCallHandler<I, O>
    ): CompletableFuture<ServerCall.Listener<I>> {
        val ctx = ServiceRequestContext.current()
        return CompletableFuture.supplyAsync({
            ctx.push().use {
                next.startCall(call, headers)
            }
        }, EXECUTOR)
    }

    companion object {
        private val EXECUTOR = Executors.newSingleThreadExecutor()
    }
}

I think AsyncServerInterceptor also needs to be modified.

Copy link
Contributor

@ikhoon ikhoon May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test failure seems fixed if we add ArmeriaCoroutineContextInterceptor as the first interceptor. #4894 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated: 2f18c5f
At the current commit, test goes pass.

But.... . if I change the order of MyAuthInterceptor, the test doesn't pass again...

diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index c8ca61e49..b0dcabca8 100644
--- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -234,10 +234,10 @@ internal class CoroutineServerInterceptorTest {
                         .exceptionMapping(statusFunction)
                         // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
                         .intercept(
+                            MyAsyncInterceptor(),
                             threadLocalInterceptor,
                             authInterceptor,
                             coroutineNameInterceptor,
-                            MyAsyncInterceptor(),
                         )
                         .addService(TestService())
                         .build()
@@ -249,10 +249,10 @@ internal class CoroutineServerInterceptorTest {
                         .exceptionMapping(statusFunction)
                         // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
                         .intercept(
+                            MyAsyncInterceptor(),
                             threadLocalInterceptor,
                             authInterceptor,
                             coroutineNameInterceptor,
-                            MyAsyncInterceptor(),
                         )
                         .useBlockingTaskExecutor(true)
                         .build()

Copy link
Contributor

@ikhoon ikhoon Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val ctx = Context.current()
return CompletableFuture.supplyAsync({
   val prev = ctx.attach()
   try {
       next.startCall(call, headers)
   } finally {
       ctx.detach(prev)
   }
}, EXECUTOR)

After some experiments, I figured out that these manual attach and detach seem the only way to propagate the gRPC's context correctly when the intercepting thread is changed.

I am okay with this PR if we can change ArmeriaCoroutineContextInterceptor to get ServiceRequestContext without relying on the current thread. @be-hase What do you think?

diff --git grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
index 223afad48..05add2d4c 100644
--- grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
+++ grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
@@ -16,8 +16,11 @@
 
 package com.linecorp.armeria.server.grpc;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.concurrent.ScheduledExecutorService;
 
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall;
 import com.linecorp.armeria.server.ServiceRequestContext;
 
 import io.grpc.Metadata;
@@ -36,7 +39,10 @@ final class ArmeriaCoroutineContextInterceptor extends CoroutineContextServerInt
 
     @Override
     public CoroutineContext coroutineContext(ServerCall<?, ?> serverCall, Metadata metadata) {
-        final ServiceRequestContext ctx = ServiceRequestContext.current();
+        checkState(serverCall instanceof AbstractServerCall,
+                   "Cannot use %s with a non-Armeria gRPC server",
+                   ArmeriaCoroutineContextInterceptor.class.getName());
+        final ServiceRequestContext ctx = ((AbstractServerCall<?, ?>) serverCall).ctx();
         final ArmeriaRequestCoroutineContext coroutineContext = new ArmeriaRequestCoroutineContext(ctx);
         final ScheduledExecutorService executor;
         if (useBlockingTaskExecutor) {

diff --git grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index c8ca61e49..f30c64ae3 100644
--- grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -19,6 +19,7 @@ package com.linecorp.armeria.server.grpc.kotlin
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import com.google.protobuf.ByteString
 import com.linecorp.armeria.client.grpc.GrpcClients
+import com.linecorp.armeria.client.logging.LoggingClient
 import com.linecorp.armeria.common.RequestContext
 import com.linecorp.armeria.common.auth.AuthToken
 import com.linecorp.armeria.common.grpc.GrpcStatusFunction
@@ -234,6 +235,7 @@ internal class CoroutineServerInterceptorTest {
                         .exceptionMapping(statusFunction)
                         // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
                         .intercept(
+                            MyAsyncInterceptor(),
                             threadLocalInterceptor,
                             authInterceptor,
                             coroutineNameInterceptor,
@@ -249,6 +251,7 @@ internal class CoroutineServerInterceptorTest {
                         .exceptionMapping(statusFunction)
                         // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
                         .intercept(
+                            MyAsyncInterceptor(),
                             threadLocalInterceptor,
                             authInterceptor,
                             coroutineNameInterceptor,
@@ -339,8 +342,9 @@ internal class CoroutineServerInterceptorTest {
                 headers: Metadata,
                 next: ServerCallHandler<I, O>
             ): CompletableFuture<ServerCall.Listener<I>> {
+                val current = Context.current()
                 return CompletableFuture.supplyAsync({
-                    next.startCall(call, headers)
+                    current.call { next.startCall(call, headers) }
                 }, EXECUTOR)
             }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the patch to e2af251 (#4894)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in getting back to you. Thanks for catching up.

After some experiments, I figured out that these manual attach and detach seem the only way to propagate the gRPC's context correctly when the intercepting thread is changed.

Yes, I agree with you.
I generally agree with your fix, but I prefer to focus only on propagating the grpc context (so the armeria context is propagated along with it) and write it this way.
However, this is a matter of personal preference and can be ignored.

diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index 5af2461cd..4e2496d67 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,8 +17,6 @@
 package com.linecorp.armeria.server.grpc.kotlin
 
 import com.linecorp.armeria.common.annotation.UnstableApi
-import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
-import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
 import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
 import io.grpc.Context
 import io.grpc.Metadata
@@ -28,7 +26,6 @@ import io.grpc.ServerInterceptor
 import io.grpc.kotlin.CoroutineContextServerInterceptor
 import io.grpc.kotlin.GrpcContextElement
 import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.future.future
 import java.util.concurrent.CompletableFuture
 import kotlin.coroutines.CoroutineContext
@@ -65,21 +62,13 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
         headers: Metadata,
         next: ServerCallHandler<I, O>
     ): CompletableFuture<ServerCall.Listener<I>> {
-        check(call is AbstractServerCall) {
-            throw IllegalArgumentException(
-                "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
-            )
-        }
-        val executor = call.blockingExecutor() ?: call.eventLoop()
-
         // COROUTINE_CONTEXT_KEY.get():
         //   It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
         //   (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
         // GrpcContextElement.current():
         //   In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
         return CoroutineScope(
-            executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) +
-                COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()
+            COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()
         ).future {
             suspendedInterceptCall(call, headers, next)
         }
diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
index 5500506b5..6b61f89a2 100644
--- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
+++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
@@ -948,11 +948,6 @@ public final class GrpcServiceBuilder {
     private ImmutableList.Builder<ServerInterceptor> interceptors() {
         if (interceptors == null) {
             interceptors = ImmutableList.builder();
-            if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
-                final ServerInterceptor coroutineContextInterceptor =
-                        new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
-                interceptors.add(coroutineContextInterceptor);
-            }
         }
         return interceptors;
     }
@@ -966,6 +961,11 @@ public final class GrpcServiceBuilder {
      */
     public GrpcService build() {
         final HandlerRegistry handlerRegistry;
+        if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
+            final ServerInterceptor coroutineContextInterceptor =
+                    new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+            interceptors().add(coroutineContextInterceptor);
+        }
         if (!enableUnframedRequests && unframedGrpcErrorHandler != null) {
             throw new IllegalStateException(
                     "'unframedGrpcErrorHandler' can only be set if unframed requests are enabled");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. Updated!

@ikhoon
Copy link
Contributor

ikhoon commented Jun 9, 2023

This PR is ready to review. PTAL. 🙇‍♂️

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent feedback and fix, @be-hase and @ikhoon! Looking forward to hear more from you about Kotlin-Armeria interoperability, @be-hase. Please feel free to keep bringing up more issues and questions! 🙇

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes make sense 👍 Learned a lot from this PR. Thanks @be-hase @ikhoon 🙇 👍 🙇

@CLAassistant
Copy link

CLAassistant commented Jun 12, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Contributor

@ikhoon ikhoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @be-hase! 👍 It was a pleasant experience building this PR together. ❤️

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix, @be-hase and @ikhoon! 🙇

Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @be-hase! 🙇

@minwoox minwoox merged commit 6cc8f88 into line:main Jun 14, 2023
13 checks passed
@minwoox
Copy link
Member

minwoox commented Jun 14, 2023

Thanks a lot, @be-hase for fixing this bug and @ikhoon for helping complete this. 🙇

@be-hase be-hase deleted the issue-4889 branch June 14, 2023 12:41
@be-hase
Copy link
Member Author

be-hase commented Jun 14, 2023

Hey guys, thanks for the review of the difficult context propagation. I also enjoyed it. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Armeria's RequestContext is not propagated when using CoroutineServerInterceptor
6 participants