Skip to content

Commit

Permalink
Release the input futures as soon as we submit the combiner task. But…
Browse files Browse the repository at this point in the history
… really, redo how we release resources in general.

This is a followup to CL 265489523, which "only" released the input futures as soon as the combiner task finished running (which had often happened even before that CL but hadn't if the combiner returned a Future that was still pending). That CL was good enough for practical purposes, but I wanted to better understand how we release resources.

This CL standardizes on AggregateFuture.releaseResources() as the way to null out all fields[*], merging logic from releaseResources(), releaseResourcesAfterFailure(), and AsyncCallableInterruptibleTask.setValue(...). As part of that, it merges AggregateFuture and AggregateFutureState/RunningState into a single object.

[*] OK, except seenExceptions, which gets its own handling. As a bonus, I believe that this CL clears seenExceptions earlier than it used to be cleared in the CombinedFuture case. Specifically, it clears it when all inputs are done, rather than when the combiner task has finished running.

It turns out that blindly nulling out fields is too aggressive, so we need to be careful in 2 cases:

1. CombinedFuture.releaseResources() can't null out `task` until the future is done or the task is done running. That's because it may need to interrupt the task. To handle this, I don't null out `task` in releaseResources() unless isDone(). To ensure that `task` still gets nulled out as soon as it's done running, I null it out directly in afterRanInterruptibly(). (OK, this is another exception to my claim that releaseResources() handles nulling out "all" fields....)

2. Even if the output future is done, processCompleted() sometimes needs access to the original futures in order to see whether any of them failed. To handle this, I store them in the listener and pass them through to processCompleted() (when necessary).

The changes to prod code are net negative in line count, at least ignoring the added comments. I've also added a couple tests, only one of which passed before this CL. And I think the model for when fields are nulled out is overall clearer after this CL. So hopefully this CL is a step forward, despite the complexity of the changes and the remaining complexity in the code. (I also included a few unrelated simplifications, like not bothering to check collectsValues before calling collectOneValue(...).)

(Aside: This CL's releaseResources() is like our proposed afterCommit() API but different. First, releaseResources() may be called even before set() or setAsync() in the CombinedFuture case. Second, CombinedFuture may rely on the fact that it's called twice in some cases: It's called once when all inputs complete, but it doesn't null out `task`, and then it can be called again if the output is cancelled, at which point it *does* null out `task`. But that probably doesn't matter too much because the task was probably handed to an executor in the meantime, so CombinedFuture is unlikely to hold the final reference to it. Anyway, for more discussion of afterCommit(), see #2886)

[]

RELNOTES=n/a

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=266130757
  • Loading branch information
cpovirk authored and kluever committed Aug 29, 2019
1 parent b503ce6 commit bbc26e1
Show file tree
Hide file tree
Showing 12 changed files with 882 additions and 668 deletions.
Expand Up @@ -46,6 +46,7 @@
import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Expand Down Expand Up @@ -2803,33 +2804,82 @@ public ListenableFuture<String> call() throws Exception {

@AndroidIncompatible
@GwtIncompatible
public void testWhenAllSucceed_releasesMemory() throws Exception {
public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception {
SettableFuture<Long> future1 = SettableFuture.create();
SettableFuture<Long> future2 = SettableFuture.create();
WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1);
WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2);

AsyncCallable<Long> combiner =
new AsyncCallable<Long>() {
Callable<Long> combiner =
new Callable<Long>() {
@Override
public ListenableFuture<Long> call() throws Exception {
return SettableFuture.create();
public Long call() {
throw new AssertionError();
}
};

ListenableFuture<Long> unused =
whenAllSucceed(future1, future2).callAsync(combiner, directExecutor());
whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor());

future1.set(1L);
future1 = null;
future2.set(2L);
future2 = null;

// Futures should be collected even if combiner future never finishes.
/*
* Futures should be collected even if combiner never runs. This is kind of a silly test, since
* the combiner is almost certain to hold its own reference to the futures, and a real app would
* hold a reference to the executor and thus to the combiner. What we really care about is that
* the futures are released once the combiner is done running. But we happen to provide this
* earlier cleanup at the moment, so we're testing it.
*/
GcFinalization.awaitClear(future1Ref);
GcFinalization.awaitClear(future2Ref);
}

@AndroidIncompatible
@GwtIncompatible
public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception {
SettableFuture<Long> future = SettableFuture.create();
WeakReference<SettableFuture<Long>> futureRef = new WeakReference<>(future);

Callable<Long> combiner =
new Callable<Long>() {
@Override
public Long call() {
throw new AssertionError();
}
};

ListenableFuture<Long> unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor());

unused.cancel(false);
future = null;

// Future should be collected because whenAll*Complete* doesn't need to look at its result.
GcFinalization.awaitClear(futureRef);
}

@AndroidIncompatible
@GwtIncompatible
public void testWhenAllSucceed_releasesCallable() throws Exception {
AsyncCallable<Long> combiner =
new AsyncCallable<Long>() {
@Override
public ListenableFuture<Long> call() {
return SettableFuture.create();
}
};
WeakReference<AsyncCallable<Long>> combinerRef = new WeakReference<>(combiner);

ListenableFuture<Long> unused =
whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor());

combiner = null;
// combiner should be collected even if the future it returns never completes.
GcFinalization.awaitClear(combinerRef);
}

/*
* TODO(cpovirk): maybe pass around TestFuture instances instead of
* ListenableFuture instances
Expand Down Expand Up @@ -3463,6 +3513,24 @@ public void testSuccessfulAsList_logging_error() throws Exception {
assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class);
}

public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception {
ListenableFuture<String> input = new CancelPanickingFuture<>();
ListenableFuture<List<String>> output = successfulAsList(input);
output.cancel(false);

List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords();
assertThat(logged).hasSize(1);
assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit.");
}

private static final class CancelPanickingFuture<V> extends AbstractFuture<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
setException(new Error("You can't fire me, I quit."));
return false;
}
}

public void testNonCancellationPropagating_successful() throws Exception {
SettableFuture<Foo> input = SettableFuture.create();
ListenableFuture<Foo> wrapper = nonCancellationPropagating(input);
Expand Down Expand Up @@ -3517,22 +3585,6 @@ private static class TestException extends Exception {
}
}

@GwtIncompatible // used only in GwtIncompatible tests
private static final Function<Exception, TestException> mapper =
new Function<Exception, TestException>() {
@Override
public TestException apply(Exception from) {
if (from instanceof ExecutionException) {
return new TestException(from.getCause());
} else {
assertTrue(
"got " + from.getClass(),
from instanceof InterruptedException || from instanceof CancellationException);
return new TestException(from);
}
}
};

@GwtIncompatible // used only in GwtIncompatible tests
private interface MapperFunction extends Function<Throwable, Exception> {}

Expand Down

0 comments on commit bbc26e1

Please sign in to comment.