Skip to content

Commit

Permalink
Merge #3053 into 3.5.0-M3
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jun 7, 2022
2 parents 686b2b3 + 42d21d8 commit 17adf0f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
21 changes: 11 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.function.Supplier;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -175,19 +176,19 @@ protected void discard(R v) {
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
synchronized (this) {
c = container;
this.value = null;
container = null;
}
else {
c = null;
}
}
else {
c = null;
}
if (c != null) {
discard(c);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.List;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -138,11 +139,11 @@ protected void discard(List<T> v) {
public void cancel() {
int state;
List<T> l;
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
l = list;
this.value = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
Expand All @@ -36,6 +39,7 @@
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -316,4 +320,50 @@ void cancelPropagatesEvenOnEmptySource() {
});
}

// https://github.com/reactor/reactor-core/issues/3052
@Test
@Tag("slow")
@Timeout(30)
void deadlockCancelOnNext() throws InterruptedException {
for (int i = 0; i < 1_000; i++) {
CoreSubscriber<? super List<Integer>> testSubscriber = TestSubscriber.create();
MonoCollectList.MonoCollectListSubscriber<Integer> subscriber = new MonoCollectListSubscriber<>(testSubscriber);
CountDownLatch latch = new CountDownLatch(2);
Subscription synchronizedSubscription = new Subscription() {

boolean cancelled;

public void request(long n) {
if (n != 123) return; //hack to only consider the request from RaceTestUtils
synchronized (this) {
if (cancelled) {
return;
}
subscriber.onNext(1);
}
}

@Override
public void cancel() {
synchronized (this) {
this.cancelled = true;
}
}
};
subscriber.onSubscribe(synchronizedSubscription);

RaceTestUtils.race(
() -> {
synchronizedSubscription.request(123);
latch.countDown();
},
() -> {
subscriber.cancel();
latch.countDown();
}
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).as("latch").isTrue();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Subscription;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +43,7 @@
import reactor.test.AutoDisposingExtension;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -396,4 +401,50 @@ void cancelPropagatesEvenOnEmptySource() {
softly.assertThat(cancel2).as("cancel2").isTrue();
});
}

// https://github.com/reactor/reactor-core/issues/3052
@Test
@Tag("slow")
@Timeout(30)
void deadlockCancelOnNext() throws InterruptedException {
for (int i = 0; i < 1_000; i++) {
CoreSubscriber<? super List<Integer>> testSubscriber = TestSubscriber.create();
MonoCollect.CollectSubscriber<Integer, ArrayList<Integer>> subscriber = new MonoCollect.CollectSubscriber<>(testSubscriber, ArrayList::add, new ArrayList<>());
CountDownLatch latch = new CountDownLatch(2);
Subscription synchronizedSubscription = new Subscription() {

boolean cancelled;

public void request(long n) {
if (n != 123) return; //hack to only consider the request from RaceTestUtils
synchronized (this) {
if (cancelled) {
return;
}
subscriber.onNext(1);
}
}

@Override
public void cancel() {
synchronized (this) {
this.cancelled = true;
}
}
};
subscriber.onSubscribe(synchronizedSubscription);

RaceTestUtils.race(
() -> {
synchronizedSubscription.request(123);
latch.countDown();
},
() -> {
subscriber.cancel();
latch.countDown();
}
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).as("latch").isTrue();
}
}
}

0 comments on commit 17adf0f

Please sign in to comment.