Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelSendOperator.WriteBarrier race condition in request(long) method leads to response being dropped #31865

Closed
asw12 opened this issue Dec 20, 2023 · 2 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: backported An issue that has been backported to maintenance branches type: bug A general bug
Milestone

Comments

@asw12
Copy link

asw12 commented Dec 20, 2023

Affects: spring-web 6.1.2


Context

I am using spring-boot-starter-undertow and WebFlux.

Description

When I have a WebFlux controller @RequestMapping handler method that returns a Publisher<T> of at least 2 elements for which publishOn is applied to move processing to a different Scheduler other than Undertow's XNIO threads, I am observing a race condition in ChannelSendOperator.WriteBarrier#request(long n) between my thread and the XNIO thread that is processing onWritePossible from the channel selector. This sounds similar to the closed issue described previously in #21098.

I can produce this race condition simply by setting a thread-suspending breakpoint on L292, and running e.g. this quick-and-dirty test case that I threw together (continue from the breakpoint after the XNIO thread parks on the object monitor indicated).

The race happens when

  1. my boundedElastic-1 thread has passed emitCachedSignals, writing the first element to the client and allow the selector to propagate the "WritePossible" event on the XNIO thread, but has not passed this.state = State.READY_TO_WRITE.
  2. the XNIO thread has passed the State.READY_TO_WRITE check, where it would normally had requested more.

In this race condition, s.request(n) is no longer called, meaning that the response is never finished sending.

@Override
public void request(long n) {
	Subscription s = this.subscription;
	if (s == null) {
		return;
	}
	if (this.state == State.READY_TO_WRITE) {
		s.request(n);
		return;
	}
	synchronized (this) {                                        // (2) "XNIO-1 I/O-#" thread is parked here, past the READY_TO_WRITE check.
		if (this.writeSubscriber != null) {
			if (this.state == State.EMITTING_CACHED_SIGNALS) {
				this.demandBeforeReadyToWrite = n;
				return;
			}
			try {
				this.state = State.EMITTING_CACHED_SIGNALS;
				if (emitCachedSignals()) {
					return;
				}
				n = n + this.demandBeforeReadyToWrite - 1;
				if (n == 0) {
					return;                     // Both threads will reach this point and return without requesting more elements.
				}
			}
			finally {
				this.state = State.READY_TO_WRITE;  // (1) "boundedElastic-1" has not finished this statement
			}
		}
	}
	s.request(n);
}

Thread Dump

The two relevant threads:

"boundedElastic-1@6727" daemon prio=5 tid=0x32 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	 blocks XNIO-1 I/O-13@6083
	  at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:292)
	  - locked <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
	  at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$1.onSubscribe(AbstractListenerWriteProcessor.java:361)
	  at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onSubscribe(AbstractListenerWriteProcessor.java:111)
	  at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358)
	  at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor$State$2.onNext(AbstractListenerWriteFlushProcessor.java:293)
	  at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:120)
	  at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:43)
	  at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
	  at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
	  at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
	  at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	  at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	  at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse.lambda$writeAndFlushWithInternal$0(AbstractListenerServerHttpResponse.java:64)
	  at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse$$Lambda$926/0x00000001005fdd18.subscribe(Unknown Source:-1)
	  at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
	  at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
	  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
	  at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	  at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
	  at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
	  at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	  at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
	  at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runSync(FluxPublishOn.java:366)
	  at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:524)
	  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	  at java.util.concurrent.FutureTask.run(FutureTask.java:264)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	  at java.lang.Thread.run(Thread.java:840)


"XNIO-1 I/O-13@6083" prio=5 tid=0x2d nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
	 waiting for boundedElastic-1@6727 to release lock on <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
	  at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:276)
	  at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$3.onWritePossible(AbstractListenerWriteProcessor.java:415)
	  at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onWritePossible(AbstractListenerWriteProcessor.java:158)
	  at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor.lambda$new$0(UndertowServerHttpResponse.java:179)
	  at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor$$Lambda$929/0x0000000100602390.handleEvent(Unknown Source:-1)
	  at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
	  at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:285)
	  at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:272)
	  at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
	  at org.xnio.conduits.WriteReadyHandler$ChannelListenerHandler.writeReady(WriteReadyHandler.java:65)
	  at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:94)
	  at org.xnio.nio.WorkerThread.run(WorkerThread.java:591)

Speculation

On the other methods of WriteBarrier that happen to synchronize on this (onNext, onError, onComplete), I noticed that there is double-checked locking in play for this.state == State.READY_TO_WRITE. Would it be correct to add this for the request method, such that we make a request upstream when we encounter this race condition?

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Dec 20, 2023
@asw12 asw12 changed the title ChannelSendOperator.WriteBarrier race condition in request(long) method leads to deadlock in sending response ChannelSendOperator.WriteBarrier race condition in request(long) method leads to response being dropped Dec 20, 2023
@jhoeller jhoeller added the in: web Issues in web modules (web, webmvc, webflux, websocket) label Jan 3, 2024
@jhoeller jhoeller added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Jan 12, 2024
@jhoeller jhoeller added this to the 6.1.4 milestone Jan 12, 2024
@jhoeller
Copy link
Contributor

@rstoyanchev reviewing the implementation of the related methods, it seems sensible to add double-checked locking here as well, simply duplicating the READY_TO_WRITE/request(n) check at the beginning of the synchronized block, analogous to the other methods. Do you see any disadvantage there? Also, I suppose this is worth backporting to 6.0.x and 5.3.x as well.

@rstoyanchev rstoyanchev self-assigned this Jan 12, 2024
@rstoyanchev
Copy link
Contributor

Thanks for the detailed analysis. It makes sense to have a similar READY_TO_WRITE check inside the synchronized block in request as well in case the server is quick in coming back to request more from a separate thread.

@rstoyanchev rstoyanchev added for: backport-to-5.3.x Marks an issue as a candidate for backport to 5.3.x for: backport-to-6.0.x Marks an issue as a candidate for backport to 6.0.x labels Jan 12, 2024
@github-actions github-actions bot added status: backported An issue that has been backported to maintenance branches and removed for: backport-to-6.0.x Marks an issue as a candidate for backport to 6.0.x labels Jan 12, 2024
@github-actions github-actions bot removed the for: backport-to-5.3.x Marks an issue as a candidate for backport to 5.3.x label Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: backported An issue that has been backported to maintenance branches type: bug A general bug
Projects
None yet
Development

No branches or pull requests

4 participants