Skip to content

Commit

Permalink
simplify LazyTimeLimitedReplayAccumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 2, 2023
1 parent b8c6237 commit a78f958
Showing 1 changed file with 11 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Du

private static final class MostRecentReplayAccumulator<T> implements ReplayAccumulator<T> {
private final int maxItems;
private final Deque<Object> queue = new ArrayDeque<>();
private final Deque<Object> items = new ArrayDeque<>();

MostRecentReplayAccumulator(final int maxItems) {
if (maxItems <= 0) {
Expand All @@ -121,29 +121,25 @@ private static final class MostRecentReplayAccumulator<T> implements ReplayAccum

@Override
public void accumulate(@Nullable final T t) {
if (queue.size() >= maxItems) {
queue.poll();
if (items.size() >= maxItems) {
items.poll();
}
queue.add(wrapNull(t));
items.add(wrapNull(t));
}

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
for (Object item : queue) {
for (Object item : items) {
consumer.accept(unwrapNullUnchecked(item));
}
}
}

private static final class LazyTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<LazyTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(LazyTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
private final int maxItems;
private volatile int queueSize;
private final Deque<TimeStampSignal<T>> items;

LazyTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
if (ttl.isNegative()) {
Expand All @@ -152,28 +148,18 @@ private static final class LazyTimeLimitedReplayAccumulator<T> implements Replay
if (maxItems <= 0) {
throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
}
this.maxItems = maxItems;
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
items = new ConcurrentLinkedQueue<>(); // SpMc
items = new ArrayDeque<>();
}

@Override
public void accumulate(@Nullable final T t) {
final TimeStampSignal<T> signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t);
for (;;) {
final int qSize = queueSize;
if (qSize < maxItems) {
if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) {
items.add(signal);
break;
}
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) {
items.poll();
items.add(signal);
break;
}
if (items.size() >= maxItems) {
items.poll();
}
items.add(new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t));
}

@Override
Expand All @@ -183,7 +169,6 @@ public void deliverAccumulation(final Consumer<T> consumer) {
while (itr.hasNext()) {
final TimeStampSignal<T> next = itr.next();
if (nanoTime - next.timeStamp >= ttlNanos) {
queueSizeUpdater.decrementAndGet(this);
itr.remove();
} else {
consumer.accept(next.signal);
Expand Down

0 comments on commit a78f958

Please sign in to comment.