Skip to content

Commit

Permalink
Fixed issue with timer firing immediately before index got incremented
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Apr 30, 2024
1 parent 16d2d1f commit d74b24f
Showing 1 changed file with 11 additions and 18 deletions.
Expand Up @@ -301,6 +301,14 @@ public void onNext(T t) {
// flushes. However, timeout does not reset it to 0, it has its own
// flag.
boolean terminated = false;
if (terminated) {
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
} else {
previousState = forceAddWork(this, state -> incrementIndex(state, 1));
}
// We can only fire the timer once we increment the index first so that
// the timer doesn't fire first as it would consume the element and try
// to decrement the index below 0.
if (getIndex(previousState) == 0) {
// fire timer, new buffer starts
try {
Expand All @@ -312,11 +320,6 @@ public void onNext(T t) {
terminated = true;
}
}
if (terminated) {
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
} else {
previousState = forceAddWork(this, state -> incrementIndex(state, 1));
}
} else {
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
}
Expand Down Expand Up @@ -469,8 +472,6 @@ private void drain(boolean resumeDemand) {
}

if (terminated && queue.isEmpty()) {
// TODO: make sure we don't do this unless we
// know all the values were delivered first
done = true;
if (logger != null) {
trace(logger, "terminated! error: " + this.error + " queue size: " + queue.size());
Expand All @@ -489,10 +490,6 @@ private void drain(boolean resumeDemand) {
previousState = resetTimeout(incrementIndex(previousState, toDecrement));
}

// TODO: If we force an update before we lose the knowledge that
// some other actor modified something in between, so we must make
// sure to track these updates in both current and previous and
// compare.
currentState = tryClearWip(this, previousState);

// If the state changed (e.g. new item arrived, a request was issued,
Expand Down Expand Up @@ -581,8 +578,7 @@ private static long incrementRequestIndex(long state) {
}

private static long getIndex(long state) {
long l = (state & INDEX_MASK) >> INDEX_SHIFT;
return l;
return (state & INDEX_MASK) >> INDEX_SHIFT;
}

private static long incrementIndex(long state, int amount) {
Expand All @@ -605,10 +601,6 @@ private static long resetTimeout(long state) {
return state & ~TIMEOUT_FLAG;
}

private static long resetIndex(long state) {
return state & ~INDEX_MASK;
}

private static boolean isTimedOut(long state) {
return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG;
}
Expand Down Expand Up @@ -666,7 +658,8 @@ private static <T, C extends Collection<? super T>> long tryClearWip(BufferTimeo
return currentState;
}

long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG;
// remove both WIP and requested_index so that we avoid overflowing
long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG & ~REQUESTED_INDEX_MASK;
if (STATE.compareAndSet(instance, currentState, nextState)) {
if (instance.stateLogger != null) {
instance.stateLogger.log(instance.toString(),
Expand Down

0 comments on commit d74b24f

Please sign in to comment.