Skip to content

Commit

Permalink
Fix cancel too synchronized in MonoCollect[List] (#3053)
Browse files Browse the repository at this point in the history
This commit reduces the length of the critical section in MonoCollectList and
MonoCollect cancel() method that is synchronized.
In particular, it removes upstream.cancel() from said critical section.

This will prevent a deadlock in case the upstream operator protects a resource
using synchronization in both its cancel() method and in any other method that
calls the collect subscriber downstream (typically onNext()).

By making the critical section shorter, we ensure that the relevant fields are
read / written to / nulled out correctly while avoiding synchronization on the
calls to the upstream / downstream, hence restoring progress.

Fixes #3052.
  • Loading branch information
simonbasle committed Jun 7, 2022
1 parent b07f391 commit 42d21d8
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 17 deletions.
21 changes: 11 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.function.Supplier;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -175,19 +176,19 @@ protected void discard(R v) {
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
synchronized (this) {
c = container;
this.value = null;
container = null;
}
else {
c = null;
}
}
else {
c = null;
}
if (c != null) {
discard(c);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.List;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -138,11 +139,11 @@ protected void discard(List<T> v) {
public void cancel() {
int state;
List<T> l;
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
l = list;
this.value = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,13 +20,16 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
Expand All @@ -36,6 +39,7 @@
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -316,4 +320,50 @@ void cancelPropagatesEvenOnEmptySource() {
});
}

// https://github.com/reactor/reactor-core/issues/3052
@Test
@Tag("slow")
@Timeout(30)
void deadlockCancelOnNext() throws InterruptedException {
for (int i = 0; i < 1_000; i++) {
CoreSubscriber<? super List<Integer>> testSubscriber = TestSubscriber.create();
MonoCollectList.MonoCollectListSubscriber<Integer> subscriber = new MonoCollectListSubscriber<>(testSubscriber);
CountDownLatch latch = new CountDownLatch(2);
Subscription synchronizedSubscription = new Subscription() {

boolean cancelled;

public void request(long n) {
if (n != 123) return; //hack to only consider the request from RaceTestUtils
synchronized (this) {
if (cancelled) {
return;
}
subscriber.onNext(1);
}
}

@Override
public void cancel() {
synchronized (this) {
this.cancelled = true;
}
}
};
subscriber.onSubscribe(synchronizedSubscription);

RaceTestUtils.race(
() -> {
synchronizedSubscription.request(123);
latch.countDown();
},
() -> {
subscriber.cancel();
latch.countDown();
}
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).as("latch").isTrue();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,12 +21,16 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Subscription;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +43,7 @@
import reactor.test.AutoDisposingExtension;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -396,4 +401,50 @@ void cancelPropagatesEvenOnEmptySource() {
softly.assertThat(cancel2).as("cancel2").isTrue();
});
}

// https://github.com/reactor/reactor-core/issues/3052
@Test
@Tag("slow")
@Timeout(30)
void deadlockCancelOnNext() throws InterruptedException {
for (int i = 0; i < 1_000; i++) {
CoreSubscriber<? super List<Integer>> testSubscriber = TestSubscriber.create();
MonoCollect.CollectSubscriber<Integer, ArrayList<Integer>> subscriber = new MonoCollect.CollectSubscriber<>(testSubscriber, ArrayList::add, new ArrayList<>());
CountDownLatch latch = new CountDownLatch(2);
Subscription synchronizedSubscription = new Subscription() {

boolean cancelled;

public void request(long n) {
if (n != 123) return; //hack to only consider the request from RaceTestUtils
synchronized (this) {
if (cancelled) {
return;
}
subscriber.onNext(1);
}
}

@Override
public void cancel() {
synchronized (this) {
this.cancelled = true;
}
}
};
subscriber.onSubscribe(synchronizedSubscription);

RaceTestUtils.race(
() -> {
synchronizedSubscription.request(123);
latch.countDown();
},
() -> {
subscriber.cancel();
latch.countDown();
}
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).as("latch").isTrue();
}
}
}

0 comments on commit 42d21d8

Please sign in to comment.