Skip to content

Commit

Permalink
Avoid FluxReplay buffer retaining/leaking with tiny TTL (#2998)
Browse files Browse the repository at this point in the history
This commit eliminates two sources of over-retaining with TTL Duration
less than 1ms. It is fixing the add method, which also attempts to clear
up outdated nodes from the linked data structure.

First it treats the Duration.ZERO case as a fast path that will directly
set the head to the newly added node. Even if it is technically outdated
it is the only one that needs to be kept around because it must remain
visible to the subsequent `replay` call to the active subscribers.

Second, it ensures that if the whole list is traversed without finding
a node which `time` is greater than the limit, the head is still moved
to the newly added node. This is more of a corner case where the TTL is
small enough that valueNode's time is not > limit.

Both changes avoid marking most nodes as removable while skipping the
actual removal, which can lead to degraded performance and memory
pressure when a lot of elements are added to the buffer.

In addition, this commit reworks and improve the FluxReplayTest cases
(reorganized test order, removed VirtualTimeScheduler unless strictly
needed, turned info-level logging to debug-level).

Fixes #2994.
  • Loading branch information
simonbasle committed Apr 7, 2022
1 parent 67319ff commit 5297df9
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 44 deletions.
31 changes: 26 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,6 +138,15 @@ static final class TimedNode<T> extends AtomicReference<TimedNode<T>> {
this.value = value;
this.time = time;
}

@Override
public String toString() {
return "TimedNode{" +
"index=" + index +
", value=" + value +
", time=" + time +
'}';
}
}

final int limit;
Expand Down Expand Up @@ -423,11 +432,11 @@ public int capacity() {
@Override
public void add(T value) {
final TimedNode<T> tail = this.tail;
final TimedNode<T> n = new TimedNode<>(tail.index + 1,
final TimedNode<T> valueNode = new TimedNode<>(tail.index + 1,
value,
scheduler.now(TimeUnit.NANOSECONDS));
tail.set(n);
this.tail = n;
tail.set(valueNode);
this.tail = valueNode;
int s = size;
if (s == limit) {
head = head.get();
Expand All @@ -437,6 +446,16 @@ public void add(T value) {
}
long limit = scheduler.now(TimeUnit.NANOSECONDS) - maxAge;

//short-circuit if maxAge == 0: no sense in keeping any old value.
//we still want to keep the newly added value in order for the immediately following replay
//to propagate it to currently registered subscribers.
if (maxAge == 0) {
head = valueNode;
return;
}

//otherwise, start walking the linked list in order to find the first node > time limit
//(in which case we'll have passed all nodes that have reached the TTL).
TimedNode<T> h = head;
TimedNode<T> next;
int removed = 0;
Expand All @@ -446,7 +465,9 @@ public void add(T value) {
break;
}

if (next.time > limit) {
if (next.time > limit || next == valueNode) {
//next == valueNode case causes head swap and actual removal, even if its time cannot be > limit.
//otherwise we'd skip removal and re-walk the whole linked list on next add, retaining outdated values for nothing.
if (removed != 0) {
size = size - removed;
head = h;
Expand Down

0 comments on commit 5297df9

Please sign in to comment.