Skip to content

Commit

Permalink
Merge #2998 into 3.5.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Apr 7, 2022
2 parents 53b80ba + 5297df9 commit ff76d76
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 43 deletions.
31 changes: 26 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,6 +138,15 @@ static final class TimedNode<T> extends AtomicReference<TimedNode<T>> {
this.value = value;
this.time = time;
}

@Override
public String toString() {
return "TimedNode{" +
"index=" + index +
", value=" + value +
", time=" + time +
'}';
}
}

final int limit;
Expand Down Expand Up @@ -423,11 +432,11 @@ public int capacity() {
@Override
public void add(T value) {
final TimedNode<T> tail = this.tail;
final TimedNode<T> n = new TimedNode<>(tail.index + 1,
final TimedNode<T> valueNode = new TimedNode<>(tail.index + 1,
value,
scheduler.now(TimeUnit.NANOSECONDS));
tail.set(n);
this.tail = n;
tail.set(valueNode);
this.tail = valueNode;
int s = size;
if (s == limit) {
head = head.get();
Expand All @@ -437,6 +446,16 @@ public void add(T value) {
}
long limit = scheduler.now(TimeUnit.NANOSECONDS) - maxAge;

//short-circuit if maxAge == 0: no sense in keeping any old value.
//we still want to keep the newly added value in order for the immediately following replay
//to propagate it to currently registered subscribers.
if (maxAge == 0) {
head = valueNode;
return;
}

//otherwise, start walking the linked list in order to find the first node > time limit
//(in which case we'll have passed all nodes that have reached the TTL).
TimedNode<T> h = head;
TimedNode<T> next;
int removed = 0;
Expand All @@ -446,7 +465,9 @@ public void add(T value) {
break;
}

if (next.time > limit) {
if (next.time > limit || next == valueNode) {
//next == valueNode case causes head swap and actual removal, even if its time cannot be > limit.
//otherwise we'd skip removal and re-walk the whole linked list on next add, retaining outdated values for nothing.
if (removed != 0) {
size = size - removed;
head = h;
Expand Down

0 comments on commit ff76d76

Please sign in to comment.