Skip to content

Commit

Permalink
Avoid FluxReplay retaining by progressively applying removals
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Apr 5, 2022
1 parent fd3cd3e commit 91910fd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
19 changes: 8 additions & 11 deletions reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,9 @@ public int capacity() {
@Override
public void add(T value) {
final TimedNode<T> tail = this.tail;
final long addedTime = scheduler.now(TimeUnit.NANOSECONDS);
final TimedNode<T> n = new TimedNode<>(tail.index + 1,
value,
addedTime);
scheduler.now(TimeUnit.NANOSECONDS));
tail.set(n);
this.tail = n;
int s = size;
Expand All @@ -445,7 +444,7 @@ public void add(T value) {
else {
size = s + 1;
}
long limit = addedTime - maxAge;
long limit = scheduler.now(TimeUnit.NANOSECONDS) - maxAge;

TimedNode<T> h = head;
TimedNode<T> next;
Expand All @@ -456,20 +455,18 @@ public void add(T value) {
break;
}

boolean atTailWhenZeroDuration = (maxAge == 0 && next.time == limit && next == n);
if (next.time > limit || atTailWhenZeroDuration) {
if (next.time > limit || next.get() == tail) {
if (removed != 0) {
size = size - removed;
head = h;
}
if (atTailWhenZeroDuration) {
size--;
TimedNode<T> empty = new TimedNode<>(-1, null, 0L);
head = empty;
this.tail = empty;
}
break;
}
else if (removed == 5000) {
size = size - removed;
removed = 0;
head = h;
}

h = next;
removed++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ public static <T> ReplayProcessor<T> createSizeAndTimeout(int size,
}

final FluxReplay.ReplayBuffer<T> buffer;
final boolean instantDeath;

Subscription subscription;

Expand All @@ -320,7 +319,6 @@ public static <T> ReplayProcessor<T> createSizeAndTimeout(int size,

ReplayProcessor(FluxReplay.ReplayBuffer<T> buffer) {
this.buffer = buffer;
this.instantDeath = buffer instanceof FluxReplay.SizeAndTimeBoundReplayBuffer && ((FluxReplay.SizeAndTimeBoundReplayBuffer) buffer).maxAge == 0L;
SUBSCRIBERS.lazySet(this, EMPTY);
}

Expand Down Expand Up @@ -507,9 +505,6 @@ public Sinks.EmitResult tryEmitNext(T t) {
//note: ReplayProcessor can so far ALWAYS buffer the element, no FAIL_ZERO_SUBSCRIBER here
b.add(t);
for (FluxReplay.ReplaySubscription<T> rs : subscribers) {
if (instantDeath) {
rs.actual().onNext(t);
}
b.replay(rs);
}
return Sinks.EmitResult.OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.assertj.core.data.Offset;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.reactivestreams.Subscription;

Expand All @@ -48,19 +48,23 @@
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

public class FluxReplayTest extends FluxOperatorTest<String, String> {

@Nullable
VirtualTimeScheduler vts;

@BeforeEach
public void vtsStart() {
//delayElements (notably) now uses parallel() so VTS must be enabled everywhere
vts = VirtualTimeScheduler.getOrSet();
public void vtsStart(TestInfo testInfo) {
if (testInfo.getTags().contains("VirtualTime")) {
//delayElements (notably) now uses parallel() so VTS must be enabled everywhere
vts = VirtualTimeScheduler.getOrSet();
}
}

@AfterEach
Expand All @@ -72,27 +76,25 @@ public void vtsStop() {

@Test
void shouldNotLeakWhenDurationZeroAndNoSubscribers() {
vtsStop(); //we want to verify the clock fix too
MemoryUtils.RetainedDetector detector = new MemoryUtils.RetainedDetector();

Sinks.Many<Integer> sink = Sinks.many().replay().limit(Duration.ZERO);

for (int i = 0; i < 1_000_000; i++) {
sink.tryEmitNext(detector.tracked(i)).orThrow();
}

System.gc();
Awaitility.await().atMost(Duration.ofSeconds(2))
Awaitility.await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(detector.finalizedCount())
.as("finalized so far")
.isCloseTo(detector.trackedTotal(), Offset.offset(130L)); //TODO investigate remaining 130 references
.isCloseTo(detector.trackedTotal(), Offset.offset(200L));
});
//TODO investigate remaining references. prior to #2994 fix it would retain all elements and have abysmal performance
}

@Test
void shouldNotLeakWhenDurationZeroAndTwoLateSubscribers() {
vtsStop(); //we want to verify the clock fix too
MemoryUtils.RetainedDetector detector = new MemoryUtils.RetainedDetector();

Sinks.Many<Integer> sink = Sinks.many().replay().limit(Duration.ZERO);
Expand All @@ -118,17 +120,17 @@ else if (i == 700_000) {
.untilAsserted(() -> {
assertThat(detector.finalizedCount())
.as("finalized so far")
.isCloseTo(detector.trackedTotal(), Offset.offset(130L)); //TODO investigate remaining 130 references
.isCloseTo(detector.trackedTotal(), Offset.offset(200L));
});
//TODO investigate remaining references. prior to #2994 fix it would retain all elements and have abysmal performance
}

@Test
void checkTimeCacheResubscribesAndCompletesAfterRepetitions() {
VirtualTimeScheduler.reset();
Flux<Integer> flow = getSource2()
.doOnSubscribe(__ -> log.info("Loading source..."))
.doOnSubscribe(__ -> log.debug("Loading source..."))
.cache(Duration.ofMillis(200))
.doOnSubscribe(__ -> log.info("Pooling cycle starting..."))
.doOnSubscribe(__ -> log.debug("Pooling cycle starting..."))
.flatMap(this::process, 2)
.repeat(1000);

Expand All @@ -143,7 +145,7 @@ private Flux<Integer> getSource2() {

private Mono<Integer> process(int channel) {
return Mono.just(channel)
.doOnNext(rec -> log.info("Processing: {}", rec))
.doOnNext(rec -> log.debug("Processing: {}", rec))
.delayElement(Duration.ofMillis(5));
}

Expand All @@ -156,7 +158,7 @@ void checkTimeCacheResubscribesAndCompletesAfterRepetitions2() {
final AtomicInteger pollEnd = new AtomicInteger();

Flux<String> flow = getSource()
.doOnSubscribe(__ -> log.info("Loading source #" + sourceLoad.incrementAndGet()))
.doOnSubscribe(__ -> log.debug("Loading source #" + sourceLoad.incrementAndGet()))
.cache(Duration.ofSeconds(1), vts)
.flatMap(v -> {
if (v == 1) {
Expand All @@ -177,7 +179,7 @@ else if (v == 5) { //this assume a 5 element source

private Flux<Integer> getSource() {
return Flux.just(1, 2, 3, 4, 5)
.doOnRequest(r -> log.info("source.request({})", r == Long.MAX_VALUE ? "unbounded" : r))
.doOnRequest(r -> log.debug("source.request({})", r == Long.MAX_VALUE ? "unbounded" : r))
.hide();
}

Expand All @@ -186,7 +188,7 @@ private Mono<String> process(String channel, VirtualTimeScheduler timeScheduler)
timeScheduler.advanceTimeBy(Duration.ofMillis(1001));
}
return Mono.fromCallable(() -> {
log.info("Processing: {}", channel);
log.debug("Processing: {}", channel);
return channel;
});
}
Expand Down Expand Up @@ -234,6 +236,7 @@ public void failTime() {
}

@Test
@Tag("VirtualTime")
public void cacheFlux() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
Expand Down Expand Up @@ -262,6 +265,7 @@ public void cacheFlux() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxFused() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
Expand Down Expand Up @@ -289,6 +293,7 @@ public void cacheFluxFused() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
Expand All @@ -315,6 +320,7 @@ public void cacheFluxTTL() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxTTLFused() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
Expand All @@ -341,6 +347,7 @@ public void cacheFluxTTLFused() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxTTLMillis() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
Expand All @@ -367,6 +374,7 @@ public void cacheFluxTTLMillis() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxTTLNanos() {
Flux<Integer> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofNanos(1000), vts)
Expand All @@ -391,6 +399,7 @@ public void cacheFluxTTLNanos() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxHistoryTTL() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
Expand Down Expand Up @@ -418,6 +427,7 @@ public void cacheFluxHistoryTTL() {
}

@Test
@Tag("VirtualTime")
public void cacheFluxHistoryTTLFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
Expand Down

0 comments on commit 91910fd

Please sign in to comment.