Skip to content

Commit

Permalink
Removed outstanding from state and fixed bug with termination upon ti…
Browse files Browse the repository at this point in the history
…mer scheduling rejection
  • Loading branch information
chemicL committed Apr 30, 2024
1 parent d74b24f commit 156711c
Showing 1 changed file with 14 additions and 41 deletions.
Expand Up @@ -147,6 +147,10 @@ final static class BufferTimeoutWithBackpressureSubscriber<T, C extends Collecti

private @Nullable Throwable error;
private boolean done;
// Access to outstanding is always guarded by volatile access to state so it
// needn't be volatile. It is also only ever accessed in the drain method so it
// needn't be part of state either.
private int outstanding;

// tracks unsatisfied downstream demand (expressed in # of buffers)
volatile long requested;
Expand All @@ -168,16 +172,12 @@ final static class BufferTimeoutWithBackpressureSubscriber<T, C extends Collecti
static final long TIMEOUT_FLAG =
0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long REQUESTED_INDEX_MASK =
0b0000_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long OUTSTANDING_MASK =
0b0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000L;
0b0000_1111_1111_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long INDEX_MASK =
0b0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111L;
0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L;

static final int INDEX_SHIFT = 0;
static final int OUTSTANDING_SHIFT = 20;
static final int REQUESTED_INDEX_SHIFT = 40;
static final int INDEX_LIMIT = 1 << OUTSTANDING_SHIFT; // 1048576; // 2^20
static final int REQUESTED_INDEX_SHIFT = 32;

public BufferTimeoutWithBackpressureSubscriber(
CoreSubscriber<? super C> actual,
Expand All @@ -188,11 +188,6 @@ public BufferTimeoutWithBackpressureSubscriber(
Supplier<C> bufferSupplier,
@Nullable Logger logger) {
this.actual = actual;
// TODO: reconsider OUTSTANDING to be taken out of the mask to allow for higher value
// -> this translates to 4MiB of ints
if (batchSize >= INDEX_LIMIT) {
throw new IllegalArgumentException("Batch size can't exceed " + INDEX_LIMIT + " items");
}
this.batchSize = batchSize;
this.timeSpan = timeSpan;
this.unit = unit;
Expand Down Expand Up @@ -301,11 +296,7 @@ public void onNext(T t) {
// flushes. However, timeout does not reset it to 0, it has its own
// flag.
boolean terminated = false;
if (terminated) {
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
} else {
previousState = forceAddWork(this, state -> incrementIndex(state, 1));
}
previousState = forceAddWork(this, state -> incrementIndex(state, 1));
// We can only fire the timer once we increment the index first so that
// the timer doesn't fire first as it would consume the element and try
// to decrement the index below 0.
Expand All @@ -317,7 +308,7 @@ public void onNext(T t) {
currentTimeoutTask.update(disposable);
} catch (RejectedExecutionException e) {
this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext());
terminated = true;
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
}
}
} else {
Expand Down Expand Up @@ -456,18 +447,14 @@ private void drain(boolean resumeDemand) {
boolean terminated = isTerminated(currentState);

if (consumed > 0) {
// currentState = addOutstanding(this, -consumed);
long decrement = -consumed;
currentState = forceUpdate(this, state -> addOutstanding(state, decrement));
previousState = addOutstanding(previousState, decrement);
outstanding -= consumed;
}
if (!terminated && currentRequest > 0) {
// request more from upstream
int remaining = getOutstanding(currentState);
int remaining = this.outstanding;
int replenishMark = prefetch >> 1; // TODO: create field limit instead
if (remaining < replenishMark) {
currentState = requestMore(prefetch - remaining);
previousState = addOutstanding(previousState, prefetch - remaining);
requestMore(prefetch - remaining);
}
}

Expand Down Expand Up @@ -527,13 +514,12 @@ int flush() {
return i;
}

private long requestMore(int n) {
private void requestMore(int n) {
if (logger != null) {
trace(logger, "requestMore " + n);
}
long currentState = forceUpdate(this, state -> addOutstanding(state, n));
outstanding += n;
Objects.requireNonNull(this.subscription).request(n);
return currentState;
}

@Override
Expand Down Expand Up @@ -637,19 +623,6 @@ private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber<?, ?> in
}
}

private static int getOutstanding(long state) {
return (int) ((state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT);
}

private static long addOutstanding(long state, long amount) {
long previousWithOutstandingClear = state &~ OUTSTANDING_MASK;
long outstandingMax = OUTSTANDING_MASK >> OUTSTANDING_SHIFT;
long current = (state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT;
long added = Math.min(current + amount, outstandingMax);
long newOutstanding = ((long) added) << OUTSTANDING_SHIFT;
return previousWithOutstandingClear | newOutstanding;
}

private static <T, C extends Collection<? super T>> long tryClearWip(BufferTimeoutWithBackpressureSubscriber<T, C> instance, long expectedState) {
for (;;) {
final long currentState = instance.state;
Expand Down

0 comments on commit 156711c

Please sign in to comment.