Skip to content

Commit

Permalink
change default, add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 2, 2023
1 parent b700f94 commit b8c6237
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static <T> ReplayStrategyBuilder<T> historyBuilder(int history) {
* @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
*/
public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Duration ttl, Executor executor) {
return historyTtlBuilder(historyHint, ttl, executor, false);
return historyTtlBuilder(historyHint, ttl, executor, true);
}

/**
Expand All @@ -85,7 +85,18 @@ public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Du
* is more likely to retain {@code historyHint} elements in memory in steady state, but avoids cost of
* scheduling timer tasks.</li>
* <li>{@code false} will evict expired items eagerly when they expire. If {@code ttl} is lower that
* {@code historyHint} relative to signal arrival rate this can use less memory but schedules time tasks.</li>
* {@code historyHint} relative to signal arrival rate this can use less memory but schedules time tasks.
* <p>
* Note that timer expiration is done concurrently which may cause gaps in signal delivery to new subscribers.
* For example:
* <pre>
* initial accumulation state: [onNext1, onNext2, onNext3, onNext4]
* Thread1: add new subscriber, subscriber does request(4), deliver onNext1
* Thread2: expire onNext1, onNext2, onNext3
* Thread1: [deliver onNext2,] deliver onNext4
* Note that onNext2 may or may not be delivered, and onNext3 was not delivered.
* </pre>
* </li>
* </ul>
* @param <T> The type of {@link ReplayStrategyBuilder}.
* @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ void concurrentTTL(boolean onError, boolean lazy) throws Exception {
private void waitForReplayQueueToDrain(Publisher<Integer> publisher) throws InterruptedException {
boolean waitForAccumulatorToDrain;
do {
Thread.sleep(1);
Thread.sleep(1); // Give the timer task a chance to expire signals.
TestPublisherSubscriber<Integer> subscriber5 = new TestPublisherSubscriber<>();
toSource(publisher).subscribe(subscriber5);
PublisherSource.Subscription subscription5 = subscriber5.awaitSubscription();
Expand Down

0 comments on commit b8c6237

Please sign in to comment.