Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe's completion handler possibly executed by the "wrong thread", leading to race conditions #5049

Open
sauthieg opened this issue Dec 22, 2023 · 2 comments
Labels
Milestone

Comments

@sauthieg
Copy link

Version

Vert.x 4.4.6, but likely earlier versions have the issue as well.

Context

It started with a suspicious NPE in our home-made Vert.x metrics plugin:

java.lang.NullPointerException: null
    at org.forgerock.http.vertx.monitoring.VertxHttpClientMetrics$HttpClientMetricsImpl$1.requestEnd(VertxHttpClientMetrics.java:120)
    at org.forgerock.http.vertx.monitoring.VertxHttpClientMetrics$HttpClientMetricsImpl$1.requestEnd(VertxHttpClientMetrics.java:94)
    at io.vertx.core.http.impl.Http1xClientConnection.endRequest(Http1xClientConnection.java:320)
    at io.vertx.core.http.impl.Http1xClientConnection.writeBuffer(Http1xClientConnection.java:307)
    at io.vertx.core.http.impl.Http1xClientConnection.access$1100(Http1xClientConnection.java:108)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:603)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:566)
    at io.vertx.core.http.impl.HttpClientRequestImpl.doWrite(HttpClientRequestImpl.java:506)
    at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:471)
    at io.vertx.core.http.impl.HttpClientRequestImpl.end(HttpClientRequestImpl.java:417)
    at io.vertx.core.streams.impl.PipeImpl.handleSuccess(PipeImpl.java:116)
    at io.vertx.core.streams.impl.PipeImpl.lambda$to$2(PipeImpl.java:102)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196)
    at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23)
    at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164)
    at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23)
    at io.vertx.core.streams.impl.PipeImpl.to(PipeImpl.java:88)
    at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119)
    at io.vertx.core.http.HttpClientRequest.send(HttpClientRequest.java:401)
    at io.vertx.rxjava3.core.http.HttpClientRequest.lambda$rxSend$24(HttpClientRequest.java:614)
   ...

Before you ask, yes I verified and we NEVER returns a null in metrics.beginRequest(...).

Then, I noticed this second exception that happens in the same request:

java.lang.IllegalStateException: unexpected message type: LastHttpContent$1, state: 0 
    at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108) 
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:966) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:934) 
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:180) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:232) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:217) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:213) 
    at io.vertx.core.http.impl.Http1xClientConnection.writeBuffer(Http1xClientConnection.java:305) 
    at io.vertx.core.http.impl.Http1xClientConnection.access$ 
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:603) 
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:566) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.doWrite(HttpClientRequestImpl.java:506) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:471) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.end(HttpClientRequestImpl.java:417) 
    at io.vertx.core.streams.impl.PipeImpl.handleSuccess(PipeImpl.java:116) 
    at io.vertx.core.streams.impl.PipeImpl.lambda$ 
    at io.vertx.core.streams.impl.PipeImpl$$Lambda$.handle 
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) 
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60) 
    at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196) 
    at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) 
    at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164) 
    at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 
    at io.vertx.core.streams.impl.PipeImpl.to(PipeImpl.java:88) 
    at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119) 
    at io.vertx.core.http.HttpClientRequest.send(HttpClientRequest.java:401) 
    at io.vertx.rxjava3.core.http.HttpClientRequest.lambda$ 

That ISE happens because the HttpObjectEncoder receive a LastHttpContent item, BEFORE been told to write the request's head (request line, headers, ...).

Analysis

The first stack is IMHO a consequence of the 2nd one: writing the last content fail, but the execution continues, ignoring the failure.
See

writeToChannel(msg, listener);
if (end) {
endRequest(s);
}
if writeToChannel() fails, we still execute endRequest().

Why that failure in the first place ?
I think it all comes down to the fact that the write operations coming from the Pipe are executed in a NON event loop thread (and therefore placed in a queue of operations), while the end operation is done by an event loop thread (and therefore executed immediately).

Do you have a reproducer?

See https://gist.github.com/sauthieg/7a9730c35da43042e7fbaf4d2a1ed1b0
That involves an RX ReadStreamSubscriber because that was what I have in my issue.

In short, that shows that when a ReadStreamSubscriber's source/upstream completes on another thread (writing 1 or more elements just before completion, still with the same thread), then the Pipe's completion might be executed by the event loop thread, potentially resulting in a race condition in the destination write stream.

That happens because:

  • PipeImpl.to() is being executed on the event loop thread
  • the upstream flowable is ready to emit all its elements (and complete) when the pipe executes src.resume()
  • upstream flowable emit on a non eventloop thread
  • upstream flowable onNext() executes the destination's WriteStream.write()(on the non event loop thread)
  • Http1xClientConnection.StreamImpl enforce that writeHead() is executed on the event loop, so that action is queued
  • upstream flowable calls ReadStreamSubscriber.onComplete, that marks the Pipe's result future as completed (but as the Pipe's result.future().onComplete(...) as NOT been executed, there is no handler yet to handle that action) (still on the non event loop thread)
  • finally, result.future().onComplete(...) is executed (by the event loop thread), and because the future is already completed, the handler is executed immediately (calling WriteStream.end())
  • that eventually executes Http1xClientConnection.StreamImpl.writeBuffer() (on the event loop), immediately trying to write that LastHttpContent item in he Netty channel
  • Boom
@sauthieg sauthieg added the bug label Dec 22, 2023
@sauthieg
Copy link
Author

Proposed solution

In PipeImpl.to(), set the completion handler BEFORE executing src.resume().
With the completion handler already in place BEFORE any content is conveyed through the Pipe, the upstream's completion event is going to be processed by the same thread that did the last dst.write(), ensuring Thread consistency and therefore avoiding race condition in downstream write stream.

NilsRenaud added a commit to NilsRenaud/vert.x that referenced this issue Dec 22, 2023
Fix issue: eclipse-vertx#5049

Signed-off-by: Nils Renaud <renaud.nils@gmail.com>
@vietj vietj added this to the 4.5.2 milestone Jan 8, 2024
@vietj
Copy link
Member

vietj commented Jan 8, 2024

thanks for the detailed report, I'll look into it

@vietj vietj modified the milestones: 4.5.2, 4.5.3 Jan 30, 2024
@vietj vietj modified the milestones: 4.5.3, 4.5.4 Feb 6, 2024
@vietj vietj modified the milestones: 4.5.4, 4.5.5 Feb 22, 2024
@vietj vietj modified the milestones: 4.5.5, 4.5.6 Mar 14, 2024
@vietj vietj modified the milestones: 4.5.6, 4.5.7, 4.5.8 Mar 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants