Skip to content

Commit

Permalink
Move interruptTask() call to immediately before afterDone() call.
Browse files Browse the repository at this point in the history
Once this CL is submitted, users who override `interruptTask()` will be able to migrate to overriding `afterDone()` with no further change to behavior.

RELNOTES=`util.concurrent`: Changed `AbstractFuture` to run `interruptTask()` just before `afterDone()`. Until this change, it ran slightly earlier than that: We used to run it before unblocking any pending `get()` calls, and now we run it after.
PiperOrigin-RevId: 482836593
  • Loading branch information
cpovirk authored and Google Java Core Libraries committed Oct 21, 2022
1 parent f9d336f commit b337be6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,13 @@ public void run() {
}
Object valueToSet = getFutureValue(future);
if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
complete(owner);
complete(
owner,
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so
* don't invoke interruptTask.
*/
false);
}
}
}
Expand Down Expand Up @@ -656,12 +662,7 @@ mayInterruptIfRunning, new CancellationException("Future.cancel() was called."))
while (true) {
if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
rValue = true;
// We call interruptTask before calling complete(), which is consistent with
// FutureTask
if (mayInterruptIfRunning) {
abstractFuture.interruptTask();
}
complete(abstractFuture);
complete(abstractFuture, mayInterruptIfRunning);
if (localValue instanceof SetFuture) {
// propagate cancellation to the future set in setfuture, this is racy, and we don't
// care if we are successful or not.
Expand Down Expand Up @@ -779,7 +780,7 @@ public void addListener(Runnable listener, Executor executor) {
protected boolean set(@ParametricNullness V value) {
Object valueToSet = value == null ? NULL : value;
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
complete(this, /*callInterruptTask=*/ false);
return true;
}
return false;
Expand All @@ -804,7 +805,7 @@ protected boolean set(@ParametricNullness V value) {
protected boolean setException(Throwable throwable) {
Object valueToSet = new Failure(checkNotNull(throwable));
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
complete(this, /*callInterruptTask=*/ false);
return true;
}
return false;
Expand Down Expand Up @@ -847,7 +848,13 @@ protected boolean setFuture(ListenableFuture<? extends V> future) {
if (future.isDone()) {
Object value = getFutureValue(future);
if (ATOMIC_HELPER.casValue(this, null, value)) {
complete(this);
complete(
this,
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so
* don't invoke interruptTask.
*/
false);
return true;
}
return false;
Expand Down Expand Up @@ -989,14 +996,26 @@ private static Object getFutureValue(ListenableFuture<?> future) {
}

/** Unblocks all threads and runs all listeners. */
private static void complete(AbstractFuture<?> param) {
private static void complete(AbstractFuture<?> param, boolean callInterruptTask) {
// Declare a "true" local variable so that the Checker Framework will infer nullness.
AbstractFuture<?> future = param;

Listener next = null;
outer:
while (true) {
future.releaseWaiters();
/*
* We call interruptTask() immediately before afterDone() so that migrating between the two
* can be a no-op.
*/
if (callInterruptTask) {
future.interruptTask();
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so don't
* invoke interruptTask on any subsequent futures.
*/
callInterruptTask = false;
}
// We call this before the listeners in order to avoid needing to manage a separate stack data
// structure for them. Also, some implementations rely on this running prior to listeners
// so that the cleanup work is visible to listeners.
Expand Down
41 changes: 30 additions & 11 deletions guava/src/com/google/common/util/concurrent/AbstractFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,13 @@ public void run() {
}
Object valueToSet = getFutureValue(future);
if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
complete(owner);
complete(
owner,
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so
* don't invoke interruptTask.
*/
false);
}
}
}
Expand Down Expand Up @@ -656,12 +662,7 @@ mayInterruptIfRunning, new CancellationException("Future.cancel() was called."))
while (true) {
if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
rValue = true;
// We call interruptTask before calling complete(), which is consistent with
// FutureTask
if (mayInterruptIfRunning) {
abstractFuture.interruptTask();
}
complete(abstractFuture);
complete(abstractFuture, mayInterruptIfRunning);
if (localValue instanceof SetFuture) {
// propagate cancellation to the future set in setfuture, this is racy, and we don't
// care if we are successful or not.
Expand Down Expand Up @@ -779,7 +780,7 @@ public void addListener(Runnable listener, Executor executor) {
protected boolean set(@ParametricNullness V value) {
Object valueToSet = value == null ? NULL : value;
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
complete(this, /*callInterruptTask=*/ false);
return true;
}
return false;
Expand All @@ -804,7 +805,7 @@ protected boolean set(@ParametricNullness V value) {
protected boolean setException(Throwable throwable) {
Object valueToSet = new Failure(checkNotNull(throwable));
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
complete(this, /*callInterruptTask=*/ false);
return true;
}
return false;
Expand Down Expand Up @@ -847,7 +848,13 @@ protected boolean setFuture(ListenableFuture<? extends V> future) {
if (future.isDone()) {
Object value = getFutureValue(future);
if (ATOMIC_HELPER.casValue(this, null, value)) {
complete(this);
complete(
this,
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so
* don't invoke interruptTask.
*/
false);
return true;
}
return false;
Expand Down Expand Up @@ -989,14 +996,26 @@ private static Object getFutureValue(ListenableFuture<?> future) {
}

/** Unblocks all threads and runs all listeners. */
private static void complete(AbstractFuture<?> param) {
private static void complete(AbstractFuture<?> param, boolean callInterruptTask) {
// Declare a "true" local variable so that the Checker Framework will infer nullness.
AbstractFuture<?> future = param;

Listener next = null;
outer:
while (true) {
future.releaseWaiters();
/*
* We call interruptTask() immediately before afterDone() so that migrating between the two
* can be a no-op.
*/
if (callInterruptTask) {
future.interruptTask();
/*
* Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so don't
* invoke interruptTask on any subsequent futures.
*/
callInterruptTask = false;
}
// We call this before the listeners in order to avoid needing to manage a separate stack data
// structure for them. Also, some implementations rely on this running prior to listeners
// so that the cleanup work is visible to listeners.
Expand Down

0 comments on commit b337be6

Please sign in to comment.