Skip to content

Deadlock in BasicPullResponseHandler from neo4j-java-driver when used in a reactive way while cancelling unfinished Reactor subscription #1230

@stepanv

Description

@stepanv

We detected a deadlock in our production setup between 2 threads, each locking same locks but in the opposite order.

See the 2 callstacks from a thread dump we collected:

"default-nioEventLoopGroup-1-2" #46 prio=5 os_prio=0 cpu=665086.46ms elapsed=268644.92s tid=0x00007f35e897f550 nid=0x31 waiting for monitor entry  [0x00007f35c1122000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.cancel(BasicPullResponseHandler.java)
	- waiting to lock <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
	at org.neo4j.driver.internal.cursor.RxResultCursorImpl.cancel(RxResultCursorImpl.java:103)
	at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1634/0x0000000801da7090.dispose(Unknown Source)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SinkDisposable.cancel(FluxCreate.java:1039)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.disposeResource(FluxCreate.java:473)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.cancel(FluxCreate.java:462)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.cancel(FluxUsingWhen.java:326)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$DeferredSubscription.cancel(Operators.java:1633)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.cancel(FluxUsingWhen.java:248)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at reactor.core.publisher.FluxUsing$UsingSubscriber.cancel(FluxUsing.java:176)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:301)
	at reactor.core.publisher.FluxRefCount.cancel(FluxRefCount.java:106)
	at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:153)
	at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:231)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxZip$ZipInner.cancel(FluxZip.java:954)
	at reactor.core.publisher.FluxZip$ZipCoordinator.cancelAll(FluxZip.java:652)
	at reactor.core.publisher.FluxZip$ZipCoordinator.cancel(FluxZip.java:616)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
	- locked <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:130)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.cancel(FluxTimeout.java:251)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.cancel(FluxSubscribeOn.java:203)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMain(FluxTakeUntilOther.java:182)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancel(FluxTakeUntilOther.java:199)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
	at reactor.core.publisher.Operators.terminate(Operators.java:1240)
	at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
	at io.micronaut.http.netty.reactive.HandlerSubscriber.cancel(HandlerSubscriber.java:239)
	at io.micronaut.http.netty.reactive.HandlerSubscriber.channelInactive(HandlerSubscriber.java:141)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.codec.http.HttpContentEncoder.channelInactive(HttpContentEncoder.java:313)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.flow.FlowControlHandler.channelInactive(FlowControlHandler.java:134)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17/Thread.java:833)

"Neo4jDriverIO-2-3" #62 daemon prio=10 os_prio=0 cpu=459229.04ms elapsed=268640.21s tid=0x00007f35beb0dfd0 nid=0x3f waiting for monitor entry  [0x00007f35bcefc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onNext(MonoCollectList.java:90)
	- waiting to lock <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:756)
	at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:915)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxUsing$UsingSubscriber.onNext(FluxUsing.java:202)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
	at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
	at org.neo4j.driver.internal.reactive.InternalRxResult.lambda$createRecordConsumer$3(InternalRxResult.java:95)
	at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1633/0x0000000801da6e58.accept(Unknown Source)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.handleRecord(BasicPullResponseHandler.java:134)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler$State$2.onRecord(BasicPullResponseHandler.java:308)
	at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.onRecord(BasicPullResponseHandler.java:89)
	- locked <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
	at org.neo4j.driver.internal.handlers.RoutingResponseHandler.onRecord(RoutingResponseHandler.java:69)
	at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleRecordMessage(InboundMessageDispatcher.java:114)
	at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackRecordMessage(CommonMessageReader.java:94)
	at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:65)
	at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
	at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
	at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17/Thread.java:833)


Here you can see that

  • thread Neo4jDriverIO-2-3 is emitting data and while executing the on next chain it first locks BasicPullResponseHandler monitor in the method onRecord() and later it tries to lock the MonoCollectListSubscriber monitor in the method onNext()
  • thread default-nioEventLoopGroup-1-2 which is running on an external event (in this case it's netty publishing a channel close which in turn makes Micronaut to cancel the reactive chain) and locks the MonoCollectListSubscriber while running cancel() and tries to lock BasicPullResponseHandler in the cancel() method

the threads happen to end up in a deadlock.

  • Neo4j version: Enterprise 4.4.3
  • Neo4j Mode: Single instance
  • Driver version: Java driver 4.4.3
  • Operating system: locally reproduced on Mac with neo4j in docker as well as originally detected on AWS in k8s alpine based java container

Steps to reproduce

  1. Start Neo4j in docker
  2. Prepare following test
import org.junit.jupiter.api.Test;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Logging;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.reactive.RxSession;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * Put a breakpoint in
 *
 * reactor.core.publisher.MonoCollectList.MonoCollectListSubscriber#onNext(java.lang.Object)
 *
 * modify the breakpoint to Suspend the current thread only!!
 *
 * Start the test in debug mode, when you hit that breakpoint resume after 10 seconds.
 * Then you'll get the deadlock, the main thread will never finish.
 * You have as much time as you need to collect the thread dump.
 */
class Neo4jDeadlockTest {

    @Test
    void neo4j_deadlock_withMonoList() {
        // Given
        Config config = Config.builder()
                .withoutEncryption()
                .withLogging(Logging.slf4j())
                .withEventLoopThreads(5)
                .build();

        try (var driver = GraphDatabase.driver("bolt://localhost:55006", AuthTokens.basic("neo4j", "test"), config)) {

            var publish = Flux.using(
                            () -> driver.rxSession(SessionConfig.forDatabase(GraphDatabaseSettings.DEFAULT_DATABASE_NAME)),
                            session -> session.readTransaction(tx -> Flux.from(tx.run("call db.ping()").records())),
                            RxSession::close
                    )
                    .collectList();

            try {
                // When
                System.out.println("Total count: " + publish.block(Duration.ofSeconds(5)).size());

            } catch (RuntimeException e) {
                // received a timeout (which will never happen because of the deadlock)

                // Then
                throw new IllegalStateException("You will never receive this message because the main thread is in deadlock with the Neo4jDriver thread");

            }

            throw new IllegalStateException("You didn't hit the deadlock because you didn't pause the onNext signal");
        }
    }
}
  1. put a breakpoint into reactor.core.publisher.MonoCollectList.MonoCollectListSubscriber#onNext(java.lang.Object) and set it to suspend only the current thread
    image

  2. Run the test in a debug mode

  3. When you hit that breakpoint wait at least for 6 seconds (until the main method tries to cancel the subscription via the block(Duration.ofSeconds(5))

  4. Resume

  5. The test should never complete because the main thread and the Neo4jDriver thread got into a deadlock

  6. collect the thread dump

This reproducer is just a demonstration of how one can reproduce the problem completely synthetically. By using the breakpoint you're causing the threads to interleave in the exact wrong order and ending in a deadlock.

We hit this deadlock while running a standard business logic in our production on AWS. (No breakpoints needed :) )

Expected behavior

The neo4j BasicPullResponseHandler should not get into a deadlock with Reactor primitives.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions