Skip to content

Commit

Permalink
Fix a bug in AbstractFuture.addListener where we failed to follow the…
Browse files Browse the repository at this point in the history
… contract precisely.

ListenableFuture.addListener says that listeners should execute 'immediately' when the future is done.  However, because the old version of addListener only checked the listeners field, it is possible to observe that the future is done and that your listener would not execute immediately.  This cl fixes that by adding an explicit check for isDone.

RELNOTES=Close a short race where you could observe that an AbstractFuture was done, but that listeners wouldn't execute immediately.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=212281545
  • Loading branch information
lukesandberg authored and cpovirk committed Sep 12, 2018
1 parent b611718 commit eb3a9f4
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 20 deletions.
Expand Up @@ -911,6 +911,66 @@ public String toString() {
}
}

// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from an afterDone callback
public void testListenersExecuteImmediately_fromAfterDone() {
AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
final AtomicBoolean ranImmediately = new AtomicBoolean();
addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
}
};
f.set("foo");
}

// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from a waiter that was just unblocked.
public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception {
final AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
// this simply delays executing listeners
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt(); // preserve status
}
}
};
Thread t =
new Thread() {
@Override
public void run() {
f.set("foo");
}
};
t.start();
f.get();
final AtomicBoolean ranImmediately = new AtomicBoolean();
f.addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
t.interrupt();
t.join();
}

private static void awaitUnchecked(final CyclicBarrier barrier) {
try {
barrier.await();
Expand Down
Expand Up @@ -677,16 +677,27 @@ protected final boolean wasInterrupted() {
public void addListener(Runnable listener, Executor executor) {
checkNotNull(listener, "Runnable was null.");
checkNotNull(executor, "Executor was null.");
Listener oldHead = listeners;
if (oldHead != Listener.TOMBSTONE) {
Listener newNode = new Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = listeners; // re-read
} while (oldHead != Listener.TOMBSTONE);
// Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for
// addListener says that listeners execute 'immediate' if the future isDone(). However, our
// protocol for completing a future is to assign the value field (which sets isDone to true) and
// then to release waiters, followed by executing afterDone(), followed by releasing listeners.
// That means that it is possible to observe that the future isDone and that your listeners
// don't execute 'immediately'. By checking isDone here we avoid that.
// A corollary to all that is that we don't need to check isDone inside the loop because if we
// get into the loop we know that we weren't done when we entered and therefore we aren't under
// an obligation to execute 'immediately'.
if (!isDone()) {
Listener oldHead = listeners;
if (oldHead != Listener.TOMBSTONE) {
Listener newNode = new Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = listeners; // re-read
} while (oldHead != Listener.TOMBSTONE);
}
}
// If we get here then the Listener TOMBSTONE was set, which means the future is done, call
// the listener.
Expand Down
Expand Up @@ -911,6 +911,66 @@ public String toString() {
}
}

// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from an afterDone callback
public void testListenersExecuteImmediately_fromAfterDone() {
AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
final AtomicBoolean ranImmediately = new AtomicBoolean();
addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
}
};
f.set("foo");
}

// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from a waiter that was just unblocked.
public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception {
final AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
// this simply delays executing listeners
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt(); // preserve status
}
}
};
Thread t =
new Thread() {
@Override
public void run() {
f.set("foo");
}
};
t.start();
f.get();
final AtomicBoolean ranImmediately = new AtomicBoolean();
f.addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
t.interrupt();
t.join();
}

private static void awaitUnchecked(final CyclicBarrier barrier) {
try {
barrier.await();
Expand Down
31 changes: 21 additions & 10 deletions guava/src/com/google/common/util/concurrent/AbstractFuture.java
Expand Up @@ -677,16 +677,27 @@ protected final boolean wasInterrupted() {
public void addListener(Runnable listener, Executor executor) {
checkNotNull(listener, "Runnable was null.");
checkNotNull(executor, "Executor was null.");
Listener oldHead = listeners;
if (oldHead != Listener.TOMBSTONE) {
Listener newNode = new Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = listeners; // re-read
} while (oldHead != Listener.TOMBSTONE);
// Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for
// addListener says that listeners execute 'immediate' if the future isDone(). However, our
// protocol for completing a future is to assign the value field (which sets isDone to true) and
// then to release waiters, followed by executing afterDone(), followed by releasing listeners.
// That means that it is possible to observe that the future isDone and that your listeners
// don't execute 'immediately'. By checking isDone here we avoid that.
// A corollary to all that is that we don't need to check isDone inside the loop because if we
// get into the loop we know that we weren't done when we entered and therefore we aren't under
// an obligation to execute 'immediately'.
if (!isDone()) {
Listener oldHead = listeners;
if (oldHead != Listener.TOMBSTONE) {
Listener newNode = new Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = listeners; // re-read
} while (oldHead != Listener.TOMBSTONE);
}
}
// If we get here then the Listener TOMBSTONE was set, which means the future is done, call
// the listener.
Expand Down

0 comments on commit eb3a9f4

Please sign in to comment.