Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BufferTimeoutSubscriber is not thread safe #3738

Open
jamiestewart opened this issue Mar 4, 2024 · 2 comments
Open

BufferTimeoutSubscriber is not thread safe #3738

jamiestewart opened this issue Mar 4, 2024 · 2 comments
Labels
type/bug A general bug
Milestone

Comments

@jamiestewart
Copy link

jamiestewart commented Mar 4, 2024

This issue seems to be very similar in cause and solution to reactor/reactor-core#2362. I recommend that you read that issue before continuing further here.

Like reactor/reactor-core#2362, synchronization is present around many of BufferTimeoutSubscriber's operations on its buffer, but BufferTimeoutSubscriber.cancel() calls Operators.onDiscardMultiple() (which iterates through the buffer) with no synchronization, so another thread is not prevented from modifying the buffer while the cancel is underway. This can cause multiple failures.

The most problematic of these failures is ArrayIndexOutOfBounds, thrown from

   java.base/java.util.ArrayList.clear(ArrayList.java:653)
   at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.cancel(FluxBufferTimeout.java:383)

This can occur if the buffer is being modified by one thread calling onNext() while another thread calls cancel(), so the cancelling thread is calling List.clear() while another thread is adding to the list.

Somewhat more common is a ConcurrentModificationException, thrown when the cancelling thread is iterating through the buffer while another thread modifies it. In anticipiation of this problem. Operators.onDiscardMultiple() catches this exception and logs it at WARN level, which prevents it from propagating, but it can be worrisome for application administrators, and the application prevents some discarded elements from being passed to the hook. Below is an example of the loggged ConcurrentModificationException:

2024-02-28 15:58:02.483  WARN 38878 --- [tor-http-nio-41] reactor.core.publisher.Operators         : Error while discarding collection, stopping

java.util.ConcurrentModificationException: null
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013) ~[na:na]
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967) ~[na:na]
	at reactor.core.publisher.Operators.onDiscardMultiple(Operators.java:567) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.cancel(FluxBufferTimeout.java:382) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.Operators.terminate(Operators.java:1240) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator$WriteBarrier.cancel(ChannelSendOperator.java:335) ~[spring-messaging-5.3.22.jar!/:5.3.22]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.Operators.terminate(Operators.java:1240) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at io.rsocket.core.RequestStreamResponderSubscriber.handleCancel(RequestStreamResponderSubscriber.java:300) ~[rsocket-core-1.1.2.jar!/:na]
	at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:193) ~[rsocket-core-1.1.2.jar!/:na]
	at io.rsocket.core.RSocketResponder.doOnDispose(RSocketResponder.java:172) ~[rsocket-core-1.1.2.jar!/:na]
	at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:106) ~[rsocket-core-1.1.2.jar!/:na]
	at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:99) ~[rsocket-core-1.1.2.jar!/:na]
	at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.21.jar!/:3.4.21]
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar!/:na]
	at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$new$0(WebsocketDuplexConnection.java:54) ~[rsocket-transport-netty-1.1.2.jar!/:na]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannel.close(AbstractChannel.java:244) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at reactor.netty.DisposableChannel.dispose(DisposableChannel.java:72) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
	at reactor.netty.channel.ChannelOperations.dispose(ChannelOperations.java:203) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
	at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:474) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:478) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
	at reactor.netty.http.server.WebsocketServerOperations.lambda$onInboundNext$2(WebsocketServerOperations.java:158) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at reactor.netty.http.server.WebsocketServerOperations.sendCloseNow(WebsocketServerOperations.java:262) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
	at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:158) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:60) ~[rsocket-transport-netty-1.1.2.jar!/:na]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] 

It looks like reactor.core.publisher.FluxBufferTimeout.BufferTimeoutSubscriber.cancel() violates the reactive-streams specification:

2.8 A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.

BufferTimeoutSubscriber.cancel is clearly not thread safe with respect to onNext, if one thread runs cancel iterating over the buffer to dispose its elements (via Operators.onDiscardMultiple) while another thread calls onNext to add new elements to the buffer. This causes ConcurrentModificationException to be thrown (at least when the underlying buffer is a ArrayList, which it is by default.)

3.15 Calling Subscription.cancel MUST return normally.

Expected Behavior

BufferTimeoutSubscriber.cancel MUST return normally, even if another thread is calling BufferTimeoutSubscriber.onNext.

Actual Behavior

On occasion, BufferTimeoutSubscriber.cancel is observed to throw ConcurrentModificationException if invoked while another thread is calling BufferTimeoutSubscriber.onNext.

Steps to Reproduce

Below is an imperfect unit test for exercising this behavior.

package reactor.core.publisher;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.FluxBufferTimeout.BufferTimeoutSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

class BufferTimeoutSubscriberConcurrency {

    private AutoCloseable mockingFramework;

    @Mock private CoreSubscriber<String> actual;
    @Mock private Context context;
    @Mock private Consumer<Object> hook;
    @Mock private Scheduler.Worker timer;
    @Mock private Subscription s;
    @Mock private Logger logger;

    @BeforeEach
    public void setup() throws URISyntaxException {
        mockingFramework = MockitoAnnotations.openMocks(this);

        // Instruct Reactive to use a mock logger
        Loggers.useCustomLoggers(loggerName -> logger);

        when(actual.currentContext()).thenReturn(context);
        when(context.getOrDefault(any(), any())).thenReturn(hook);
    }

    @AfterEach
    public void cleanup() throws Exception {
        mockingFramework.close();
    }

    @Test
    void test() throws InterruptedException {
        for (int index = 0; index < 1_000_000 ; index++) {
            BufferTimeoutSubscriber<String, ArrayList<String>> subscriber = new BufferTimeoutSubscriber(actual,
                    1_000_000, 1, TimeUnit.HOURS, timer, ArrayList::new);

            // Start with a non-empty buffer
            subscriber.onSubscribe(s);
            subscriber.request(5_000);

            var thread1 = new Thread(() -> subscriber.cancel());
            var thread2 = new Thread(() -> {
                while (thread1.isAlive()) {
                    subscriber.onNext("an element");
                }
            });
            var threads = List.of(thread1, thread2);
            threads.forEach(Thread::start);
            for (var thread : threads) {
                thread.join();
            }
        }

        verify(logger, never()).warn(any());
    }
}

Possible Solution

As was done in the fix for 2362, synchronizing the implementation of BufferTimeoutSubscriber.cancel on this would address the problem.

My Environment

Reactor version(s) used: 3.4.21
JVM version (java -version):
openjdk version "17.0.2"
openjdk 17.0.2 2022-01-18
OpenJDK Runtime Environment (build 17.0.2+8-86)
OpenJDK 64-Bit Server VM (build 17.0.2+8-86, mixed mode, sharing)
OS and version (eg uname -a):
Darwin jstewart-mb01 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 4, 2024
@chemicL chemicL added type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 5, 2024
@chemicL chemicL added this to the 3.4.x Backlog milestone Mar 5, 2024
@chemicL
Copy link
Member

chemicL commented Mar 5, 2024

Thank you for the report @jamiestewart 👍
I can confirm this is a bug in the existing implementation. I need to give it some thought about the next steps.

There is another implementation of the bufferTimeout() operator that honours backpressure. Can you try that one? It also has a known deficiency regarding concurrency but that is something that's planned to be fixed.

Regarding this issue, we'd need:

  • a JCStress test
  • JMH benchmark for the proposed solution
  • and of course, the implementation

Are you considering a contribution, @jamiestewart?

@He-Pin
Copy link
Contributor

He-Pin commented Mar 20, 2024

is it possible for you to upgrade to newer version @jamiestewart , does this problem still present?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

4 participants