Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using an parTraverse inside another parTraverse causes a hang #2764

Closed
Vaysman opened this issue Jul 14, 2022 · 6 comments
Closed

Using an parTraverse inside another parTraverse causes a hang #2764

Vaysman opened this issue Jul 14, 2022 · 6 comments

Comments

@Vaysman
Copy link

Vaysman commented Jul 14, 2022

Hi!

First of all! Thanks for the Arrow. I like it very much.

Maybe I'm doing something wrong, but I haven't found anywhere a restriction on using a parTraverse inside another parTraverse.

Here's my example:

class Example {
    fun doIt(data: Data) = runBlocking {
        effect<ExampleError, Unit> {
            val result = doIt1(data).bind()
            println(result.size)
        }.fold(
            {
                when (it) {
                    ExampleError.NullError -> println("str is null")
                }
            },
            {}
        )
    }


    private fun doIt1(data: Data) =
        effect<ExampleError, List<List<String>>> {
            data.nested.parTraverse {nested ->
                doIt2(nested).bind()
            }
        }


    private fun doIt2(nested: NestedData) =
        effect<ExampleError, List<String>> {
            nested.nestedNested.parTraverse { nestedNested ->
                val str = ensureNotNull(nestedNested.str) { ExampleError.NullError }
                str
            }
        }
}

When I run this code in the test, everything hangs and I see this exception

Exception in thread "Test worker" kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[BlockingEventLoop@895e367, Continuation at arrow.fx.coroutines.ParTraverse__ParTraverseKt$parTraverse$3.invokeSuspend(ParTraverse.kt:207)@329dbdbf]){Completed}@4ef782af. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
	at kotlinx.coroutines.DispatchedTask.handleFatalException(DispatchedTask.kt:144)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:115)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:279)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at example.Example.doIt(Example.kt:9)
	at example.ExampleTest.should pass$ArrowEffect(ExampleTest.kt:20)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:205)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:201)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineId(1), "coroutine#1":ScopeCoroutine{Cancelled}@6d7fc27, BlockingEventLoop@895e367]
Caused by: ShiftCancellationException(Shifted Continuation)
	at example.Example$doIt$1$invokeSuspend$$inlined$effect$1$2.shift(Effect.kt:776)
	at arrow.core.continuations.EffectScope$bind$2.invoke(EffectScope.kt:57)
	at arrow.core.continuations.EffectScope$bind$2.invoke(EffectScope.kt:57)
	at arrow.core.continuations.FoldContinuation$resumeWith$2$f$1.invokeSuspend(Effect.kt:715)
	at arrow.core.continuations.FoldContinuation$resumeWith$2$f$1.invoke(Effect.kt)
	at arrow.core.continuations.FoldContinuation$resumeWith$2$f$1.invoke(Effect.kt)
	at arrow.core.continuations.FoldContinuation.resumeWith(Effect.kt:716)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.internal.ScopeCoroutine.afterResume(Scopes.kt:33)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:102)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	... 91 more

I also created a separate example project with a test, so that you can easily check.

@nomisRev
Copy link
Member

Hey @Vaysman,

strange there should not be such a restriction, and I’ve used nested parTraverse in the past. Thanks for the reproducible examples! That will certainly help investigate this.

This seems related to #2760 (comment)

and thank you for the support, and nice words about Arrow 🙌

@nomisRev
Copy link
Member

It is indeed related to #2760.

I've pinpointed the problematic code:

  fun doIt1(data: Data) =
    effect<ExampleError, List<List<String>>> {
      data.nested.parTraverse { nested ->
        doIt2(nested)
          .bind() // <-- Fails, and makes Coroutines hang
          // .fold({ shift(it) }, { it }) // <-- Works fine
          // .toEither().bind() // <-- Works fine
      }
    }

So it seems that somehow bind() being used from a nested coroutine is what causes the issue.
parTraverse creates List#size Coroutines to run the task in parallel, and it seems that bind() is somehow creating incorrect code.

This is the case for Arrow 1.1.2 & 1.1.3-alpha.30 in combination with Kotlin 1.6.20, 1.7.0 or 1.7.10.
I am currently looking for a fix / workaround, and if I find one I'll open a PR and link it here ;)

For now you can rely on toEither().bind() as a temporary workaround.

@Vaysman
Copy link
Author

Vaysman commented Jul 14, 2022

It is indeed related to #2760.

For now you can rely on toEither().bind() as a temporary workaround.

@nomisRev Thanks for the quick response. And thanks so much for the workaround.

@nomisRev
Copy link
Member

I found a fix, you can check out the PR here. #2765
I tested your Ktor scenario as well, and it also fixes the issue there.I found a fix, you can check out the PR here.

Thanks for the simplified example @Vaysman! It was very helpful.

@nomisRev
Copy link
Member

Oh this issue closed automatically.

@Vaysman the fix is available in 1.1.3-alpha.31
If you were able to test it, and it works please report back here 🙏

thanks for opening the issue with a reproducible repo. That was awesome!

@nomisRev nomisRev mentioned this issue Aug 1, 2022
@nomisRev
Copy link
Member

nomisRev commented Aug 3, 2022

A new, and better fix been made is available in 1.1.3-alpha.38
If you have some time could you check it, and report back please? 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants