Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in BasicPullResponseHandler from neo4j-java-driver when used in a reactive way while cancelling unfinished Reactor subscription #1230

Closed
stepanv opened this issue May 20, 2022 · 5 comments · Fixed by #1233

Comments

@stepanv
Copy link

stepanv commented May 20, 2022

We detected a deadlock in our production setup between 2 threads, each locking same locks but in the opposite order.

See the 2 callstacks from a thread dump we collected:

"default-nioEventLoopGroup-1-2" #46 prio=5 os_prio=0 cpu=665086.46ms elapsed=268644.92s tid=0x00007f35e897f550 nid=0x31 waiting for monitor entry  [0x00007f35c1122000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.cancel(BasicPullResponseHandler.java)
	- waiting to lock <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
	at org.neo4j.driver.internal.cursor.RxResultCursorImpl.cancel(RxResultCursorImpl.java:103)
	at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1634/0x0000000801da7090.dispose(Unknown Source)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SinkDisposable.cancel(FluxCreate.java:1039)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.disposeResource(FluxCreate.java:473)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.cancel(FluxCreate.java:462)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.cancel(FluxUsingWhen.java:326)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$DeferredSubscription.cancel(Operators.java:1633)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.cancel(FluxUsingWhen.java:248)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at reactor.core.publisher.FluxUsing$UsingSubscriber.cancel(FluxUsing.java:176)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:301)
	at reactor.core.publisher.FluxRefCount.cancel(FluxRefCount.java:106)
	at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:153)
	at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:231)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxZip$ZipInner.cancel(FluxZip.java:954)
	at reactor.core.publisher.FluxZip$ZipCoordinator.cancelAll(FluxZip.java:652)
	at reactor.core.publisher.FluxZip$ZipCoordinator.cancel(FluxZip.java:616)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
	- locked <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:130)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.cancel(FluxTimeout.java:251)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.cancel(FluxSubscribeOn.java:203)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMain(FluxTakeUntilOther.java:182)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancel(FluxTakeUntilOther.java:199)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at io.micronaut.http.netty.reactive.HandlerSubscriber.cancel(HandlerSubscriber.java:239)
	at io.micronaut.http.netty.reactive.HandlerSubscriber.channelInactive(HandlerSubscriber.java:141)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.codec.http.HttpContentEncoder.channelInactive(HttpContentEncoder.java:313)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.flow.FlowControlHandler.channelInactive(FlowControlHandler.java:134)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17/Thread.java:833)

"Neo4jDriverIO-2-3" #62 daemon prio=10 os_prio=0 cpu=459229.04ms elapsed=268640.21s tid=0x00007f35beb0dfd0 nid=0x3f waiting for monitor entry  [0x00007f35bcefc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onNext(MonoCollectList.java:90)
	- waiting to lock <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:756)
	at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:915)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxUsing$UsingSubscriber.onNext(FluxUsing.java:202)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
	at org.neo4j.driver.internal.reactive.InternalRxResult.lambda$createRecordConsumer$3(InternalRxResult.java:95)
	at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1633/0x0000000801da6e58.accept(Unknown Source)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.handleRecord(BasicPullResponseHandler.java:134)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler$State$2.onRecord(BasicPullResponseHandler.java:308)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.onRecord(BasicPullResponseHandler.java:89)
	- locked <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
	at org.neo4j.driver.internal.handlers.RoutingResponseHandler.onRecord(RoutingResponseHandler.java:69)
	at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleRecordMessage(InboundMessageDispatcher.java:114)
	at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackRecordMessage(CommonMessageReader.java:94)
	at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:65)
	at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
	at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
	at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17/Thread.java:833)


Here you can see that

  • thread Neo4jDriverIO-2-3 is emitting data and while executing the on next chain it first locks BasicPullResponseHandler monitor in the method onRecord() and later it tries to lock the MonoCollectListSubscriber monitor in the method onNext()
  • thread default-nioEventLoopGroup-1-2 which is running on an external event (in this case it's netty publishing a channel close which in turn makes Micronaut to cancel the reactive chain) and locks the MonoCollectListSubscriber while running cancel() and tries to lock BasicPullResponseHandler in the cancel() method

the threads happen to end up in a deadlock.

  • Neo4j version: Enterprise 4.4.3
  • Neo4j Mode: Single instance
  • Driver version: Java driver 4.4.3
  • Operating system: locally reproduced on Mac with neo4j in docker as well as originally detected on AWS in k8s alpine based java container

Steps to reproduce

  1. Start Neo4j in docker
  2. Prepare following test
import org.junit.jupiter.api.Test;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Logging;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.reactive.RxSession;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * Put a breakpoint in
 *
 * reactor.core.publisher.MonoCollectList.MonoCollectListSubscriber#onNext(java.lang.Object)
 *
 * modify the breakpoint to Suspend the current thread only!!
 *
 * Start the test in debug mode, when you hit that breakpoint resume after 10 seconds.
 * Then you'll get the deadlock, the main thread will never finish.
 * You have as much time as you need to collect the thread dump.
 */
class Neo4jDeadlockTest {

    @Test
    void neo4j_deadlock_withMonoList() {
        // Given
        Config config = Config.builder()
                .withoutEncryption()
                .withLogging(Logging.slf4j())
                .withEventLoopThreads(5)
                .build();

        try (var driver = GraphDatabase.driver("bolt://localhost:55006", AuthTokens.basic("neo4j", "test"), config)) {

            var publish = Flux.using(
                            () -> driver.rxSession(SessionConfig.forDatabase(GraphDatabaseSettings.DEFAULT_DATABASE_NAME)),
                            session -> session.readTransaction(tx -> Flux.from(tx.run("call db.ping()").records())),
                            RxSession::close
                    )
                    .collectList();

            try {
                // When
                System.out.println("Total count: " + publish.block(Duration.ofSeconds(5)).size());

            } catch (RuntimeException e) {
                // received a timeout (which will never happen because of the deadlock)

                // Then
                throw new IllegalStateException("You will never receive this message because the main thread is in deadlock with the Neo4jDriver thread");

            }

            throw new IllegalStateException("You didn't hit the deadlock because you didn't pause the onNext signal");
        }
    }
}
  1. put a breakpoint into reactor.core.publisher.MonoCollectList.MonoCollectListSubscriber#onNext(java.lang.Object) and set it to suspend only the current thread
    image

  2. Run the test in a debug mode

  3. When you hit that breakpoint wait at least for 6 seconds (until the main method tries to cancel the subscription via the block(Duration.ofSeconds(5))

  4. Resume

  5. The test should never complete because the main thread and the Neo4jDriver thread got into a deadlock

  6. collect the thread dump

This reproducer is just a demonstration of how one can reproduce the problem completely synthetically. By using the breakpoint you're causing the threads to interleave in the exact wrong order and ending in a deadlock.

We hit this deadlock while running a standard business logic in our production on AWS. (No breakpoints needed :) )

Expected behavior

The neo4j BasicPullResponseHandler should not get into a deadlock with Reactor primitives.

@stepanv
Copy link
Author

stepanv commented May 24, 2022

To work around this problem, one can instruct the reactive stream to cancel on a different thread. But that needs to be done after the particular thread executes MonoCollectList.cancel(). That is to declare it before the .collectList() statement.

var publish = Flux.using(
                () -> driver.rxSession(SessionConfig.forDatabase(GraphDatabaseSettings.DEFAULT_DATABASE_NAME)),
                session -> session.readTransaction(tx -> Flux.from(tx.run("call db.ping()").records())),
                rxSession -> {
                    rxSession.close();
                }
        )
        .cancelOn(Schedulers.parallel())
        .collectList();

note that this wouldn't work as the parallel thread would get into a deadlock:

      .collectList()
      .cancelOn(Schedulers.parallel());

but note that this work around works only thanks to the fact that other implementations of the Subscription interface do not use the same lock for onNext() and cancel().

@injectives
Copy link
Contributor

Thank you for detailed explanation of the problem. We will see if we can improve it.

In the meantime, have you had a chance to try the following update? reactor/reactor-core#3053

@stepanv
Copy link
Author

stepanv commented May 26, 2022

@injectives , yep that Simon's improvement prevents the deadlock I reported.
However, as he is pointing out, you guys should be ready to accept concurrent cancel() and onNext() calls. This is not possible now as you're using the same monitor (this) as these methods in BasicPullResponseHandler.java are synchronized. Furthermore, you keep the lock while calling upstream (in cancel()) and downstream (in onRecord()) reactive stream.

@injectives injectives pinned this issue May 26, 2022
@injectives injectives unpinned this issue May 26, 2022
@injectives
Copy link
Contributor

@stepanv, would you mind giving the following update a go? #1233
I have checked it locally and it seems to prevent this issue from happening.

@stepanv
Copy link
Author

stepanv commented Jun 9, 2022

Hi, I'm sorry for the late response.
With the 4.4.6 version, I'm no longer hitting the deadlock. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants