From bbc26e180ba06dc45daa1aa6a3d56fd14594bb0e Mon Sep 17 00:00:00 2001 From: cpovirk Date: Thu, 29 Aug 2019 06:57:47 -0700 Subject: [PATCH] Release the input futures as soon as we submit the combiner task. But really, redo how we release resources in general. This is a followup to CL 265489523, which "only" released the input futures as soon as the combiner task finished running (which had often happened even before that CL but hadn't if the combiner returned a Future that was still pending). That CL was good enough for practical purposes, but I wanted to better understand how we release resources. This CL standardizes on AggregateFuture.releaseResources() as the way to null out all fields[*], merging logic from releaseResources(), releaseResourcesAfterFailure(), and AsyncCallableInterruptibleTask.setValue(...). As part of that, it merges AggregateFuture and AggregateFutureState/RunningState into a single object. [*] OK, except seenExceptions, which gets its own handling. As a bonus, I believe that this CL clears seenExceptions earlier than it used to be cleared in the CombinedFuture case. Specifically, it clears it when all inputs are done, rather than when the combiner task has finished running. It turns out that blindly nulling out fields is too aggressive, so we need to be careful in 2 cases: 1. CombinedFuture.releaseResources() can't null out `task` until the future is done or the task is done running. That's because it may need to interrupt the task. To handle this, I don't null out `task` in releaseResources() unless isDone(). To ensure that `task` still gets nulled out as soon as it's done running, I null it out directly in afterRanInterruptibly(). (OK, this is another exception to my claim that releaseResources() handles nulling out "all" fields....) 2. Even if the output future is done, processCompleted() sometimes needs access to the original futures in order to see whether any of them failed. To handle this, I store them in the listener and pass them through to processCompleted() (when necessary). The changes to prod code are net negative in line count, at least ignoring the added comments. I've also added a couple tests, only one of which passed before this CL. And I think the model for when fields are nulled out is overall clearer after this CL. So hopefully this CL is a step forward, despite the complexity of the changes and the remaining complexity in the code. (I also included a few unrelated simplifications, like not bothering to check collectsValues before calling collectOneValue(...).) (Aside: This CL's releaseResources() is like our proposed afterCommit() API but different. First, releaseResources() may be called even before set() or setAsync() in the CombinedFuture case. Second, CombinedFuture may rely on the fact that it's called twice in some cases: It's called once when all inputs complete, but it doesn't null out `task`, and then it can be called again if the output is cancelled, at which point it *does* null out `task`. But that probably doesn't matter too much because the task was probably handed to an executor in the meantime, so CombinedFuture is unlikely to hold the final reference to it. Anyway, for more discussion of afterCommit(), see https://github.com/google/guava/issues/2886) [] RELNOTES=n/a ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=266130757 --- .../common/util/concurrent/FuturesTest.java | 98 +++- .../util/concurrent/AggregateFuture.java | 440 ++++++++++-------- .../util/concurrent/AggregateFutureState.java | 28 +- .../util/concurrent/CollectionFuture.java | 98 ++-- .../util/concurrent/CombinedFuture.java | 95 ++-- .../util/concurrent/AggregateFutureState.java | 6 +- .../util/concurrent/FuturesTest_gwt.java | 27 ++ .../common/util/concurrent/FuturesTest.java | 98 +++- .../util/concurrent/AggregateFuture.java | 439 +++++++++-------- .../util/concurrent/AggregateFutureState.java | 28 +- .../util/concurrent/CollectionFuture.java | 98 ++-- .../util/concurrent/CombinedFuture.java | 95 ++-- 12 files changed, 882 insertions(+), 668 deletions(-) diff --git a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 089fa1dc6676..dc319016bf2f 100644 --- a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -46,6 +46,7 @@ import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -2803,33 +2804,82 @@ public ListenableFuture call() throws Exception { @AndroidIncompatible @GwtIncompatible - public void testWhenAllSucceed_releasesMemory() throws Exception { + public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); - AsyncCallable combiner = - new AsyncCallable() { + Callable combiner = + new Callable() { @Override - public ListenableFuture call() throws Exception { - return SettableFuture.create(); + public Long call() { + throw new AssertionError(); } }; ListenableFuture unused = - whenAllSucceed(future1, future2).callAsync(combiner, directExecutor()); + whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); future1.set(1L); future1 = null; future2.set(2L); future2 = null; - // Futures should be collected even if combiner future never finishes. + /* + * Futures should be collected even if combiner never runs. This is kind of a silly test, since + * the combiner is almost certain to hold its own reference to the futures, and a real app would + * hold a reference to the executor and thus to the combiner. What we really care about is that + * the futures are released once the combiner is done running. But we happen to provide this + * earlier cleanup at the moment, so we're testing it. + */ GcFinalization.awaitClear(future1Ref); GcFinalization.awaitClear(future2Ref); } + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { + SettableFuture future = SettableFuture.create(); + WeakReference> futureRef = new WeakReference<>(future); + + Callable combiner = + new Callable() { + @Override + public Long call() { + throw new AssertionError(); + } + }; + + ListenableFuture unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); + + unused.cancel(false); + future = null; + + // Future should be collected because whenAll*Complete* doesn't need to look at its result. + GcFinalization.awaitClear(futureRef); + } + + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllSucceed_releasesCallable() throws Exception { + AsyncCallable combiner = + new AsyncCallable() { + @Override + public ListenableFuture call() { + return SettableFuture.create(); + } + }; + WeakReference> combinerRef = new WeakReference<>(combiner); + + ListenableFuture unused = + whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); + + combiner = null; + // combiner should be collected even if the future it returns never completes. + GcFinalization.awaitClear(combinerRef); + } + /* * TODO(cpovirk): maybe pass around TestFuture instances instead of * ListenableFuture instances @@ -3463,6 +3513,24 @@ public void testSuccessfulAsList_logging_error() throws Exception { assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } + public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + ListenableFuture input = new CancelPanickingFuture<>(); + ListenableFuture> output = successfulAsList(input); + output.cancel(false); + + List logged = aggregateFutureLogHandler.getStoredLogRecords(); + assertThat(logged).hasSize(1); + assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); + } + + private static final class CancelPanickingFuture extends AbstractFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + setException(new Error("You can't fire me, I quit.")); + return false; + } + } + public void testNonCancellationPropagating_successful() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); @@ -3517,22 +3585,6 @@ private static class TestException extends Exception { } } - @GwtIncompatible // used only in GwtIncompatible tests - private static final Function mapper = - new Function() { - @Override - public TestException apply(Exception from) { - if (from instanceof ExecutionException) { - return new TestException(from.getCause()); - } else { - assertTrue( - "got " + from.getClass(), - from instanceof InterruptedException || from instanceof CancellationException); - return new TestException(from); - } - } - }; - @GwtIncompatible // used only in GwtIncompatible tests private interface MapperFunction extends Function {} diff --git a/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java index 18e0e9e898b2..ae3fde8858c9 100644 --- a/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java @@ -16,8 +16,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.logging.Level.SEVERE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -26,270 +29,303 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.compatqual.NullableDecl; /** - * A future made up of a collection of sub-futures. + * A future whose value is derived from a collection of input futures. * * @param the type of the individual inputs * @param the type of the output (i.e. this) future */ @GwtCompatible -abstract class AggregateFuture extends AbstractFuture.TrustedFuture { +abstract class AggregateFuture extends AggregateFutureState { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); + /** + * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to + * propagate cancellation) and {@link #toString()}. To access the futures' values, {@code + * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case + * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. + */ /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - @NullableDecl private RunningState runningState; + @NullableDecl private ImmutableCollection> futures; + + private final boolean allMustSucceed; + private final boolean collectsValues; + + AggregateFuture( + ImmutableCollection> futures, + boolean allMustSucceed, + boolean collectsValues) { + super(futures.size()); + this.futures = checkNotNull(futures); + this.allMustSucceed = allMustSucceed; + this.collectsValues = collectsValues; + } @Override protected final void afterDone() { super.afterDone(); - releaseResources(); - } - - protected final void releaseResources() { - RunningState localRunningState = runningState; - if (localRunningState != null) { - // Let go of the memory held by the running state - this.runningState = null; - ImmutableCollection> futures = - localRunningState.futures; - boolean wasInterrupted = wasInterrupted(); - if (wasInterrupted) { - localRunningState.interruptTask(); - } + ImmutableCollection> localFutures = futures; + releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` - if (isCancelled() & futures != null) { - for (ListenableFuture future : futures) { - future.cancel(wasInterrupted); - } + if (isCancelled() & localFutures != null) { + boolean wasInterrupted = wasInterrupted(); + for (Future future : localFutures) { + future.cancel(wasInterrupted); } } + /* + * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed + * again if some outstanding input fails. + */ } @Override - protected String pendingToString() { - RunningState localRunningState = runningState; - if (localRunningState == null) { - return null; - } - ImmutableCollection> localFutures = - localRunningState.futures; + protected final String pendingToString() { + ImmutableCollection> localFutures = futures; if (localFutures != null) { return "futures=[" + localFutures + "]"; } return null; } - /** Must be called at the end of each sub-class's constructor. */ - final void init(RunningState runningState) { - this.runningState = runningState; - runningState.init(); - } - - abstract class RunningState extends AggregateFutureState implements Runnable { - private ImmutableCollection> futures; - private final boolean allMustSucceed; - private final boolean collectsValues; - - RunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - boolean collectsValues) { - super(futures.size()); - this.futures = checkNotNull(futures); - this.allMustSucceed = allMustSucceed; - this.collectsValues = collectsValues; - } - - /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ - @Override - public final void run() { - decrementCountAndMaybeComplete(); + /** + * Must be called at the end of each subclass's constructor. This method performs the "real" + * initialization; we can't put this in the constructor because, in the case where futures are + * already complete, we would not initialize the subclass before calling {@link + * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, + * we're guaranteed to have properly initialized the subclass. + */ + final void init() { + // Corner case: List is empty. + if (futures.isEmpty()) { + handleAllCompleted(); + return; } - /** - * The "real" initialization; we can't put this in the constructor because, in the case where - * futures are already complete, we would not initialize the subclass before calling {@link - * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed - * to have properly initialized the subclass. - */ - private void init() { - // Corner case: List is empty. - if (futures.isEmpty()) { - handleAllCompleted(); - return; - } - - // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll - // need to handle RejectedExecutionException + // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll + // need to handle RejectedExecutionException - if (allMustSucceed) { - // We need fail fast, so we have to keep track of which future failed so we can propagate - // the exception immediately + if (allMustSucceed) { + // We need fail fast, so we have to keep track of which future failed so we can propagate + // the exception immediately - // Register a listener on each Future in the list to update the state of this future. - // Note that if all the futures on the list are done prior to completing this loop, the last - // call to addListener() will callback to setOneValue(), transitively call our cleanup - // listener, and set this.futures to null. - // This is not actually a problem, since the foreach only needs this.futures to be non-null - // at the beginning of the loop. - int i = 0; - for (final ListenableFuture listenable : futures) { - final int index = i++; - listenable.addListener( - new Runnable() { - @Override - public void run() { - try { - handleOneInputDone(index, listenable); - } finally { - decrementCountAndMaybeComplete(); + // Register a listener on each Future in the list to update the state of this future. + // Note that if all the futures on the list are done prior to completing this loop, the last + // call to addListener() will callback to setOneValue(), transitively call our cleanup + // listener, and set this.futures to null. + // This is not actually a problem, since the foreach only needs this.futures to be non-null + // at the beginning of the loop. + int i = 0; + for (final ListenableFuture future : futures) { + final int index = i++; + future.addListener( + new Runnable() { + @Override + public void run() { + try { + if (future.isCancelled()) { + // Clear futures prior to cancelling children. This sets our own state but lets + // the input futures keep running, as some of them may be used elsewhere. + futures = null; + cancel(false); + } else { + collectValueFromNonCancelledFuture(index, future); } + } finally { + /* + * "null" means: There is no need to access `futures` again during + * `processCompleted` because we're reading each value during a call to + * handleOneInputDone. + */ + decrementCountAndMaybeComplete(null); } - }, - directExecutor()); - } - } else { - // We'll only call the callback when all futures complete, regardless of whether some failed - // Hold off on calling setOneValue until all complete, so we can share the same listener - for (ListenableFuture listenable : futures) { - listenable.addListener(this, directExecutor()); - } + } + }, + directExecutor()); + } + } else { + /* + * We'll call the user callback or collect the values only when all inputs complete, + * regardless of whether some failed. This lets us avoid calling expensive methods like + * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it + * lets all futures share the same listener. + * + * We store `localFutures` inside the listener because `this.futures` might be nulled out by + * the time the listener runs for the final future -- at which point we need to check all + * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't + * need access to the futures again, so we can just pass `null`. + * + * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. + * If we make some other optimizations, this one will no longer be necessary. The optimization + * could actually hurt in some cases, as it forces us to keep all inputs in memory until the + * final input completes. + */ + final ImmutableCollection> localFutures = + collectsValues ? futures : null; + Runnable listener = + new Runnable() { + @Override + public void run() { + decrementCountAndMaybeComplete(localFutures); + } + }; + for (ListenableFuture future : futures) { + future.addListener(listener, directExecutor()); } } + } - /** - * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the - * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the - * throwable did not cause this future to fail, and it is the first time we've seen that - * particular Throwable. - */ - private void handleException(Throwable throwable) { - checkNotNull(throwable); + /** + * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the + * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the + * throwable did not cause this future to fail, and it is the first time we've seen that + * particular Throwable. + */ + private void handleException(Throwable throwable) { + checkNotNull(throwable); - boolean completedWithFailure = false; - boolean firstTimeSeeingThisException = true; - if (allMustSucceed) { - // As soon as the first one fails, throw the exception up. - // The result of all other inputs is then ignored. - completedWithFailure = setException(throwable); - if (completedWithFailure) { - releaseResourcesAfterFailure(); - } else { - // Go up the causal chain to see if we've already seen this cause; if we have, even if - // it's wrapped by a different exception, don't log it. - firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (allMustSucceed) { + // As soon as the first one fails, make that failure the result of the output future. + // The results of all other inputs are then ignored (except for logging any failures). + boolean completedWithFailure = setException(throwable); + if (!completedWithFailure) { + // Go up the causal chain to see if we've already seen this cause; if we have, even if + // it's wrapped by a different exception, don't log it. + boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (firstTimeSeeingThisException) { + log(throwable); + return; } } - - // | and & used because it's faster than the branch required for || and && - if (throwable instanceof Error - | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { - String message = - (throwable instanceof Error) - ? "Input Future failed with Error" - : "Got more than one input Future failure. Logging failures after the first"; - logger.log(Level.SEVERE, message, throwable); - } } - @Override - final void addInitialException(Set seen) { - if (!isCancelled()) { - // TODO(cpovirk): Think about whether we could/should use Verify to check this. - boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); - } + /* + * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call + * handleException() at all. + */ + if (throwable instanceof Error) { + /* + * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it + * returned true? This was intentional (CL 46470009), but it seems odd compared to how we + * normally handle Error. + * + * Similarly, do we really want to log the same Error more than once? + */ + log(throwable); } + } - /** Handles the input at the given index completing. */ - private void handleOneInputDone(int index, Future future) { - // The only cases in which this Future should already be done are (a) if it was cancelled or - // (b) if an input failed and we propagated that immediately because of allMustSucceed. - checkState( - allMustSucceed || !isDone() || isCancelled(), - "Future was done before all dependencies completed"); + private static void log(Throwable throwable) { + String message = + (throwable instanceof Error) + ? "Input Future failed with Error" + : "Got more than one input Future failure. Logging failures after the first"; + logger.log(SEVERE, message, throwable); + } - try { - checkState(future.isDone(), "Tried to set value from future which is not done"); - if (allMustSucceed) { - if (future.isCancelled()) { - // clear running state prior to cancelling children, this sets our own state but lets - // the input futures keep running as some of them may be used elsewhere. - runningState = null; - cancel(false); - } else { - // We always get the result so that we can have fail-fast, even if we don't collect - InputT result = getDone(future); - if (collectsValues) { - collectOneValue(allMustSucceed, index, result); - } - } - } else if (collectsValues && !future.isCancelled()) { - collectOneValue(allMustSucceed, index, getDone(future)); - } - } catch (ExecutionException e) { - handleException(e.getCause()); - } catch (Throwable t) { - handleException(t); - } + @Override + final void addInitialException(Set seen) { + checkNotNull(seen); + if (!isCancelled()) { + // TODO(cpovirk): Think about whether we could/should use Verify to check this. + boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); } + } - private void decrementCountAndMaybeComplete() { - int newRemaining = decrementRemainingAndGet(); - checkState(newRemaining >= 0, "Less than 0 remaining futures"); - if (newRemaining == 0) { - processCompleted(); - } + /** + * Collects the result (success or failure) of one input future. The input must not have been + * cancelled. For details on when this is called, see {@link #collectOneValue}. + */ + private void collectValueFromNonCancelledFuture(int index, Future future) { + try { + // We get the result, even if collectOneValue is a no-op, so that we can fail fast. + collectOneValue(index, getDone(future)); + } catch (ExecutionException e) { + handleException(e.getCause()); + } catch (Throwable t) { + handleException(t); } + } + + private void decrementCountAndMaybeComplete( + @NullableDecl + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + int newRemaining = decrementRemainingAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + processCompleted(futuresIfNeedToCollectAtCompletion); + } + } - private void processCompleted() { - // Collect the values if (a) our output requires collecting them and (b) we haven't been - // collecting them as we go. (We've collected them as we go only if we needed to fail fast) - if (collectsValues & !allMustSucceed) { - int i = 0; - for (ListenableFuture listenable : futures) { - handleOneInputDone(i++, listenable); + private void processCompleted( + @NullableDecl + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + if (futuresIfNeedToCollectAtCompletion != null) { + int i = 0; + for (Future future : futuresIfNeedToCollectAtCompletion) { + if (!future.isCancelled()) { + collectValueFromNonCancelledFuture(i, future); } + i++; } - handleAllCompleted(); } - - /** - * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we - * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we - * immediately release resources we no longer need); additionally, the future will release its - * reference to {@link RunningState}, which should free all associated memory when all the - * futures complete and the listeners are released. - * - *

TODO(user): Write tests for memory retention + clearSeenExceptions(); + handleAllCompleted(); + /* + * Null out fields, including some used in handleAllCompleted() above (like + * `CollectionFuture.values`). This might be a no-op: If this future completed during + * handleAllCompleted(), they will already have been nulled out. But in the case of + * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in + * the case of callAsync(), which waits for the callback's returned future to complete. */ - @ForOverride - @OverridingMethodsMustInvokeSuper - void releaseResourcesAfterFailure() { - this.futures = null; - } + releaseResources(ALL_INPUT_FUTURES_PROCESSED); + } - /** - * Called only if {@code collectsValues} is true. - * - *

If {@code allMustSucceed} is true, called as each future completes; otherwise, called for - * each future when all futures complete. + /** + * Clears fields that are no longer needed after this future has completed -- or at least all its + * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). + * Often called multiple times (that is, both when the inputs complete and when the output + * completes). + * + *

This is similar to our proposed {@code afterCommit} method but not quite the same. See the + * description of CL 265462958. + */ + // TODO(user): Write more tests for memory retention. + @ForOverride + @OverridingMethodsMustInvokeSuper + void releaseResources(ReleaseResourcesReason reason) { + checkNotNull(reason); + /* + * All elements of `futures` are completed, or this future has already completed and read + * `futures` into a local variable (in preparation for propagating cancellation to them). In + * either case, no one needs to read `futures` for cancellation purposes later. (And + * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) */ - abstract void collectOneValue( - boolean allMustSucceed, int index, @NullableDecl InputT returnValue); - - abstract void handleAllCompleted(); + this.futures = null; + } - void interruptTask() {} + enum ReleaseResourcesReason { + OUTPUT_FUTURE_DONE, + ALL_INPUT_FUTURES_PROCESSED, } + /** + * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code + * collectsValues} is true, called for each future when all futures complete. + */ + abstract void collectOneValue(int index, @NullableDecl InputT returnValue); + + abstract void handleAllCompleted(); + /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ private static boolean addCausalChain(Set seen, Throwable t) { for (; t != null; t = t.getCause()) { diff --git a/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java b/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java index 040d81363c47..f8398d817eb2 100644 --- a/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java +++ b/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java @@ -37,9 +37,9 @@ */ @GwtCompatible(emulated = true) @ReflectionSupport(value = ReflectionSupport.Level.FULL) -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures - // & this future completes. Released when the future releases the reference to the running state + // have completed and we have processed them all. private volatile Set seenExceptions = null; private volatile int remaining; @@ -89,12 +89,27 @@ final Set getOrInitSeenExceptions() { * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception, * and wrongly believes that its exception is new (leading it to logging it when it shouldn't) * - * Our solution is for threads to CAS seenExceptions from null to a Set population with _the + * Our solution is for threads to CAS seenExceptions from null to a Set populated with _the * initial exception_, no matter which thread does the work. This ensures that seenExceptions * always contains not just the current thread's exception but also the initial thread's. */ Set seenExceptionsLocal = seenExceptions; if (seenExceptionsLocal == null) { + // TODO(cpovirk): Should we use a simpler (presumably cheaper) data structure? + /* + * Using weak references here could let us release exceptions earlier, but: + * + * 1. On Android, querying a WeakReference blocks if the GC is doing an otherwise-concurrent + * pass. + * + * 2. We would probably choose to compare exceptions using == instead of equals() (for + * consistency with how weak references are cleared). That's a behavior change -- arguably the + * removal of a feature. + * + * Fortunately, exceptions rarely contain references to expensive resources. + */ + + // seenExceptionsLocal = newConcurrentHashSet(); /* * Other handleException() callers may see this as soon as we publish it. We need to populate @@ -122,6 +137,10 @@ final int decrementRemainingAndGet() { return ATOMIC_HELPER.decrementAndGetRemainingCount(this); } + final void clearSeenExceptions() { + seenExceptions = null; + } + private abstract static class AtomicHelper { /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */ abstract void compareAndSetSeenExceptions( @@ -169,8 +188,7 @@ void compareAndSetSeenExceptions( @Override int decrementAndGetRemainingCount(AggregateFutureState state) { synchronized (state) { - state.remaining--; - return state.remaining; + return --state.remaining; } } } diff --git a/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java b/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java index 97476d59b2d1..e2d2f2ffd9d1 100644 --- a/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java @@ -14,7 +14,6 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayListWithCapacity; import static java.util.Collections.unmodifiableList; @@ -29,83 +28,64 @@ /** Aggregate future that collects (stores) results of each future. */ @GwtCompatible(emulated = true) abstract class CollectionFuture extends AggregateFuture { + private List> values; - abstract class CollectionFutureRunningState extends RunningState { - private List> values; + CollectionFuture( + ImmutableCollection> futures, + boolean allMustSucceed) { + super(futures, allMustSucceed, true); - CollectionFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed, true); - - this.values = - futures.isEmpty() - ? ImmutableList.>of() - : Lists.>newArrayListWithCapacity(futures.size()); + this.values = + futures.isEmpty() + ? ImmutableList.>of() + : Lists.>newArrayListWithCapacity(futures.size()); - // Populate the results list with null initially. - for (int i = 0; i < futures.size(); ++i) { - values.add(null); - } + // Populate the results list with null initially. + for (int i = 0; i < futures.size(); ++i) { + values.add(null); } + } - @Override - final void collectOneValue(boolean allMustSucceed, int index, @NullableDecl V returnValue) { - List> localValues = values; - - if (localValues != null) { - localValues.set(index, Optional.fromNullable(returnValue)); - } else { - // Some other future failed or has been cancelled, causing this one to also be cancelled or - // have an exception set. This should only happen if allMustSucceed is true or if the output - // itself has been cancelled. - checkState( - allMustSucceed || isCancelled(), "Future was done before all dependencies completed"); - } - } - - @Override - final void handleAllCompleted() { - List> localValues = values; - if (localValues != null) { - set(combine(localValues)); - } else { - checkState(isDone()); - } + @Override + final void collectOneValue(int index, @NullableDecl V returnValue) { + List> localValues = values; + if (localValues != null) { + localValues.set(index, Optional.fromNullable(returnValue)); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); - this.values = null; + @Override + final void handleAllCompleted() { + List> localValues = values; + if (localValues != null) { + set(combine(localValues)); } + } - abstract C combine(List> values); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + this.values = null; } + abstract C combine(List> values); + /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */ static final class ListFuture extends CollectionFuture> { ListFuture( ImmutableCollection> futures, boolean allMustSucceed) { - init(new ListFutureRunningState(futures, allMustSucceed)); + super(futures, allMustSucceed); + init(); } - private final class ListFutureRunningState extends CollectionFutureRunningState { - ListFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed); - } - - @Override - public List combine(List> values) { - List result = newArrayListWithCapacity(values.size()); - for (Optional element : values) { - result.add(element != null ? element.orNull() : null); - } - return unmodifiableList(result); + @Override + public List combine(List> values) { + List result = newArrayListWithCapacity(values.size()); + for (Optional element : values) { + result.add(element != null ? element.orNull() : null); } + return unmodifiableList(result); } } } diff --git a/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java b/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java index ded4b77b214f..38e939476214 100644 --- a/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java @@ -15,7 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -30,16 +30,16 @@ /** Aggregate future that computes its value by calling a callable. */ @GwtCompatible final class CombinedFuture extends AggregateFuture { + private CombinedFutureInterruptibleTask task; + CombinedFuture( ImmutableCollection> futures, boolean allMustSucceed, Executor listenerExecutor, AsyncCallable callable) { - init( - new CombinedFutureRunningState( - futures, - allMustSucceed, - new AsyncCallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor); + init(); } CombinedFuture( @@ -47,47 +47,42 @@ final class CombinedFuture extends AggregateFuture { boolean allMustSucceed, Executor listenerExecutor, Callable callable) { - init( - new CombinedFutureRunningState( - futures, allMustSucceed, new CallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new CallableInterruptibleTask(callable, listenerExecutor); + init(); } - private final class CombinedFutureRunningState extends RunningState { - private CombinedFutureInterruptibleTask task; - - CombinedFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - CombinedFutureInterruptibleTask task) { - super(futures, allMustSucceed, false); - this.task = task; - } + @Override + void collectOneValue(int index, @NullableDecl Object returnValue) {} - @Override - void collectOneValue(boolean allMustSucceed, int index, @NullableDecl Object returnValue) {} - - @Override - void handleAllCompleted() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.execute(); - } else { - checkState(isDone()); - } + @Override + void handleAllCompleted() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.execute(); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + /* + * If the output future is done, then it won't need to interrupt the task later, so it can clear + * its reference to it. + * + * If the output future is *not* done, then the task field will be cleared after the task runs + * or after the output future is done, whichever comes first. + */ + if (reason == OUTPUT_FUTURE_DONE) { this.task = null; } + } - @Override - void interruptTask() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.interruptTask(); - } + @Override + protected void interruptTask() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.interruptTask(); } } @@ -96,7 +91,7 @@ private abstract class CombinedFutureInterruptibleTask extends InterruptibleT private final Executor listenerExecutor; boolean thrownByExecute = true; - public CombinedFutureInterruptibleTask(Executor listenerExecutor) { + CombinedFutureInterruptibleTask(Executor listenerExecutor) { this.listenerExecutor = checkNotNull(listenerExecutor); } @@ -117,6 +112,19 @@ final void execute() { @Override final void afterRanInterruptibly(T result, Throwable error) { + /* + * The future no longer needs to interrupt this task, so it no longer needs a reference to it. + * + * TODO(cpovirk): It might be nice for our InterruptibleTask subclasses to null out their + * `callable` fields automatically. That would make it less important for us to null out the + * reference to `task` here (though it's still nice to do so in case our reference to the + * executor keeps it alive). Ideally, nulling out `callable` would be the responsibility of + * InterruptibleTask itself so that its other subclasses also benefit. (Handling `callable` in + * InterruptibleTask itself might also eliminate some of the existing boilerplate for, e.g., + * pendingToString().) + */ + CombinedFuture.this.task = null; + if (error != null) { if (error instanceof ExecutionException) { setException(error.getCause()); @@ -138,7 +146,7 @@ private final class AsyncCallableInterruptibleTask extends CombinedFutureInterruptibleTask> { private final AsyncCallable callable; - public AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { + AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } @@ -157,9 +165,6 @@ ListenableFuture runInterruptibly() throws Exception { @Override void setValue(ListenableFuture value) { setFuture(value); - // Eagerly release resources instead of waiting for afterDone. We are done with the inputs, - // but the actual future may not complete for arbitrarily long. - releaseResources(); } @Override @@ -172,7 +177,7 @@ String toPendingString() { private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask { private final Callable callable; - public CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { + CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } diff --git a/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java b/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java index 91c603b90e0a..0058c34c03dc 100644 --- a/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java +++ b/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java @@ -21,7 +21,7 @@ import java.util.Set; /** Emulation of AggregateFutureState. */ -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures // & this future completes. Released when the future releases the reference to the running state private Set seenExceptions = null; @@ -44,4 +44,8 @@ final Set getOrInitSeenExceptions() { final int decrementRemainingAndGet() { return --remaining; } + + final void clearSeenExceptions() { + seenExceptions = null; + } } diff --git a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java index dacbfa1b07f2..427eed9f70ca 100644 --- a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java +++ b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java @@ -2016,6 +2016,33 @@ public void testSuccessfulAsList_emptyList() throws Exception { } } +public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); + testCase.setUp(); + Throwable failure = null; + try { + testCase.testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled(); + } catch (Throwable t) { + failure = t; + } + try { + testCase.tearDown(); + } catch (Throwable t) { + if (failure == null) { + failure = t; + } + } + if (failure instanceof Exception) { + throw (Exception) failure; + } + if (failure instanceof Error) { + throw (Error) failure; + } + if (failure != null) { + throw new RuntimeException(failure); + } +} + public void testSuccessfulAsList_logging_error() throws Exception { com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); testCase.setUp(); diff --git a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 571bed062dde..1eef3d6505c2 100644 --- a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -46,6 +46,7 @@ import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -2803,33 +2804,82 @@ public ListenableFuture call() throws Exception { @AndroidIncompatible @GwtIncompatible - public void testWhenAllSucceed_releasesMemory() throws Exception { + public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); - AsyncCallable combiner = - new AsyncCallable() { + Callable combiner = + new Callable() { @Override - public ListenableFuture call() throws Exception { - return SettableFuture.create(); + public Long call() { + throw new AssertionError(); } }; ListenableFuture unused = - whenAllSucceed(future1, future2).callAsync(combiner, directExecutor()); + whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); future1.set(1L); future1 = null; future2.set(2L); future2 = null; - // Futures should be collected even if combiner future never finishes. + /* + * Futures should be collected even if combiner never runs. This is kind of a silly test, since + * the combiner is almost certain to hold its own reference to the futures, and a real app would + * hold a reference to the executor and thus to the combiner. What we really care about is that + * the futures are released once the combiner is done running. But we happen to provide this + * earlier cleanup at the moment, so we're testing it. + */ GcFinalization.awaitClear(future1Ref); GcFinalization.awaitClear(future2Ref); } + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { + SettableFuture future = SettableFuture.create(); + WeakReference> futureRef = new WeakReference<>(future); + + Callable combiner = + new Callable() { + @Override + public Long call() { + throw new AssertionError(); + } + }; + + ListenableFuture unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); + + unused.cancel(false); + future = null; + + // Future should be collected because whenAll*Complete* doesn't need to look at its result. + GcFinalization.awaitClear(futureRef); + } + + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllSucceed_releasesCallable() throws Exception { + AsyncCallable combiner = + new AsyncCallable() { + @Override + public ListenableFuture call() { + return SettableFuture.create(); + } + }; + WeakReference> combinerRef = new WeakReference<>(combiner); + + ListenableFuture unused = + whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); + + combiner = null; + // combiner should be collected even if the future it returns never completes. + GcFinalization.awaitClear(combinerRef); + } + /* * TODO(cpovirk): maybe pass around TestFuture instances instead of * ListenableFuture instances @@ -3463,6 +3513,24 @@ public void testSuccessfulAsList_logging_error() throws Exception { assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } + public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + ListenableFuture input = new CancelPanickingFuture<>(); + ListenableFuture> output = successfulAsList(input); + output.cancel(false); + + List logged = aggregateFutureLogHandler.getStoredLogRecords(); + assertThat(logged).hasSize(1); + assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); + } + + private static final class CancelPanickingFuture extends AbstractFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + setException(new Error("You can't fire me, I quit.")); + return false; + } + } + public void testNonCancellationPropagating_successful() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); @@ -3517,22 +3585,6 @@ private static class TestException extends Exception { } } - @GwtIncompatible // used only in GwtIncompatible tests - private static final Function mapper = - new Function() { - @Override - public TestException apply(Exception from) { - if (from instanceof ExecutionException) { - return new TestException(from.getCause()); - } else { - assertTrue( - "got " + from.getClass(), - from instanceof InterruptedException || from instanceof CancellationException); - return new TestException(from); - } - } - }; - @GwtIncompatible // used only in GwtIncompatible tests private interface MapperFunction extends Function {} diff --git a/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/guava/src/com/google/common/util/concurrent/AggregateFuture.java index 90670271e9ea..c58abd1eb8e9 100644 --- a/guava/src/com/google/common/util/concurrent/AggregateFuture.java +++ b/guava/src/com/google/common/util/concurrent/AggregateFuture.java @@ -16,8 +16,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.logging.Level.SEVERE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -26,269 +29,303 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.Nullable; /** - * A future made up of a collection of sub-futures. + * A future whose value is derived from a collection of input futures. * * @param the type of the individual inputs * @param the type of the output (i.e. this) future */ @GwtCompatible -abstract class AggregateFuture extends AbstractFuture.TrustedFuture { +abstract class AggregateFuture extends AggregateFutureState { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); + /** + * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to + * propagate cancellation) and {@link #toString()}. To access the futures' values, {@code + * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case + * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. + */ /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - private @Nullable RunningState runningState; + private @Nullable ImmutableCollection> futures; + + private final boolean allMustSucceed; + private final boolean collectsValues; + + AggregateFuture( + ImmutableCollection> futures, + boolean allMustSucceed, + boolean collectsValues) { + super(futures.size()); + this.futures = checkNotNull(futures); + this.allMustSucceed = allMustSucceed; + this.collectsValues = collectsValues; + } @Override protected final void afterDone() { super.afterDone(); - releaseResources(); - } - - protected final void releaseResources() { - RunningState localRunningState = runningState; - if (localRunningState != null) { - // Let go of the memory held by the running state - this.runningState = null; - ImmutableCollection> futures = - localRunningState.futures; - boolean wasInterrupted = wasInterrupted(); - if (wasInterrupted) { - localRunningState.interruptTask(); - } + ImmutableCollection> localFutures = futures; + releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` - if (isCancelled() & futures != null) { - for (ListenableFuture future : futures) { - future.cancel(wasInterrupted); - } + if (isCancelled() & localFutures != null) { + boolean wasInterrupted = wasInterrupted(); + for (Future future : localFutures) { + future.cancel(wasInterrupted); } } + /* + * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed + * again if some outstanding input fails. + */ } @Override - protected String pendingToString() { - RunningState localRunningState = runningState; - if (localRunningState == null) { - return null; - } - ImmutableCollection> localFutures = - localRunningState.futures; + protected final String pendingToString() { + ImmutableCollection> localFutures = futures; if (localFutures != null) { return "futures=[" + localFutures + "]"; } return null; } - /** Must be called at the end of each sub-class's constructor. */ - final void init(RunningState runningState) { - this.runningState = runningState; - runningState.init(); - } - - abstract class RunningState extends AggregateFutureState implements Runnable { - private ImmutableCollection> futures; - private final boolean allMustSucceed; - private final boolean collectsValues; - - RunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - boolean collectsValues) { - super(futures.size()); - this.futures = checkNotNull(futures); - this.allMustSucceed = allMustSucceed; - this.collectsValues = collectsValues; - } - - /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ - @Override - public final void run() { - decrementCountAndMaybeComplete(); + /** + * Must be called at the end of each subclass's constructor. This method performs the "real" + * initialization; we can't put this in the constructor because, in the case where futures are + * already complete, we would not initialize the subclass before calling {@link + * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, + * we're guaranteed to have properly initialized the subclass. + */ + final void init() { + // Corner case: List is empty. + if (futures.isEmpty()) { + handleAllCompleted(); + return; } - /** - * The "real" initialization; we can't put this in the constructor because, in the case where - * futures are already complete, we would not initialize the subclass before calling {@link - * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed - * to have properly initialized the subclass. - */ - private void init() { - // Corner case: List is empty. - if (futures.isEmpty()) { - handleAllCompleted(); - return; - } - - // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll - // need to handle RejectedExecutionException + // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll + // need to handle RejectedExecutionException - if (allMustSucceed) { - // We need fail fast, so we have to keep track of which future failed so we can propagate - // the exception immediately + if (allMustSucceed) { + // We need fail fast, so we have to keep track of which future failed so we can propagate + // the exception immediately - // Register a listener on each Future in the list to update the state of this future. - // Note that if all the futures on the list are done prior to completing this loop, the last - // call to addListener() will callback to setOneValue(), transitively call our cleanup - // listener, and set this.futures to null. - // This is not actually a problem, since the foreach only needs this.futures to be non-null - // at the beginning of the loop. - int i = 0; - for (final ListenableFuture listenable : futures) { - final int index = i++; - listenable.addListener( - new Runnable() { - @Override - public void run() { - try { - handleOneInputDone(index, listenable); - } finally { - decrementCountAndMaybeComplete(); + // Register a listener on each Future in the list to update the state of this future. + // Note that if all the futures on the list are done prior to completing this loop, the last + // call to addListener() will callback to setOneValue(), transitively call our cleanup + // listener, and set this.futures to null. + // This is not actually a problem, since the foreach only needs this.futures to be non-null + // at the beginning of the loop. + int i = 0; + for (final ListenableFuture future : futures) { + final int index = i++; + future.addListener( + new Runnable() { + @Override + public void run() { + try { + if (future.isCancelled()) { + // Clear futures prior to cancelling children. This sets our own state but lets + // the input futures keep running, as some of them may be used elsewhere. + futures = null; + cancel(false); + } else { + collectValueFromNonCancelledFuture(index, future); } + } finally { + /* + * "null" means: There is no need to access `futures` again during + * `processCompleted` because we're reading each value during a call to + * handleOneInputDone. + */ + decrementCountAndMaybeComplete(null); } - }, - directExecutor()); - } - } else { - // We'll only call the callback when all futures complete, regardless of whether some failed - // Hold off on calling setOneValue until all complete, so we can share the same listener - for (ListenableFuture listenable : futures) { - listenable.addListener(this, directExecutor()); - } + } + }, + directExecutor()); + } + } else { + /* + * We'll call the user callback or collect the values only when all inputs complete, + * regardless of whether some failed. This lets us avoid calling expensive methods like + * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it + * lets all futures share the same listener. + * + * We store `localFutures` inside the listener because `this.futures` might be nulled out by + * the time the listener runs for the final future -- at which point we need to check all + * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't + * need access to the futures again, so we can just pass `null`. + * + * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. + * If we make some other optimizations, this one will no longer be necessary. The optimization + * could actually hurt in some cases, as it forces us to keep all inputs in memory until the + * final input completes. + */ + final ImmutableCollection> localFutures = + collectsValues ? futures : null; + Runnable listener = + new Runnable() { + @Override + public void run() { + decrementCountAndMaybeComplete(localFutures); + } + }; + for (ListenableFuture future : futures) { + future.addListener(listener, directExecutor()); } } + } - /** - * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the - * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the - * throwable did not cause this future to fail, and it is the first time we've seen that - * particular Throwable. - */ - private void handleException(Throwable throwable) { - checkNotNull(throwable); + /** + * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the + * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the + * throwable did not cause this future to fail, and it is the first time we've seen that + * particular Throwable. + */ + private void handleException(Throwable throwable) { + checkNotNull(throwable); - boolean completedWithFailure = false; - boolean firstTimeSeeingThisException = true; - if (allMustSucceed) { - // As soon as the first one fails, throw the exception up. - // The result of all other inputs is then ignored. - completedWithFailure = setException(throwable); - if (completedWithFailure) { - releaseResourcesAfterFailure(); - } else { - // Go up the causal chain to see if we've already seen this cause; if we have, even if - // it's wrapped by a different exception, don't log it. - firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (allMustSucceed) { + // As soon as the first one fails, make that failure the result of the output future. + // The results of all other inputs are then ignored (except for logging any failures). + boolean completedWithFailure = setException(throwable); + if (!completedWithFailure) { + // Go up the causal chain to see if we've already seen this cause; if we have, even if + // it's wrapped by a different exception, don't log it. + boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (firstTimeSeeingThisException) { + log(throwable); + return; } } - - // | and & used because it's faster than the branch required for || and && - if (throwable instanceof Error - | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { - String message = - (throwable instanceof Error) - ? "Input Future failed with Error" - : "Got more than one input Future failure. Logging failures after the first"; - logger.log(Level.SEVERE, message, throwable); - } } - @Override - final void addInitialException(Set seen) { - if (!isCancelled()) { - // TODO(cpovirk): Think about whether we could/should use Verify to check this. - boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); - } + /* + * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call + * handleException() at all. + */ + if (throwable instanceof Error) { + /* + * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it + * returned true? This was intentional (CL 46470009), but it seems odd compared to how we + * normally handle Error. + * + * Similarly, do we really want to log the same Error more than once? + */ + log(throwable); } + } - /** Handles the input at the given index completing. */ - private void handleOneInputDone(int index, Future future) { - // The only cases in which this Future should already be done are (a) if it was cancelled or - // (b) if an input failed and we propagated that immediately because of allMustSucceed. - checkState( - allMustSucceed || !isDone() || isCancelled(), - "Future was done before all dependencies completed"); + private static void log(Throwable throwable) { + String message = + (throwable instanceof Error) + ? "Input Future failed with Error" + : "Got more than one input Future failure. Logging failures after the first"; + logger.log(SEVERE, message, throwable); + } - try { - checkState(future.isDone(), "Tried to set value from future which is not done"); - if (allMustSucceed) { - if (future.isCancelled()) { - // clear running state prior to cancelling children, this sets our own state but lets - // the input futures keep running as some of them may be used elsewhere. - runningState = null; - cancel(false); - } else { - // We always get the result so that we can have fail-fast, even if we don't collect - InputT result = getDone(future); - if (collectsValues) { - collectOneValue(allMustSucceed, index, result); - } - } - } else if (collectsValues && !future.isCancelled()) { - collectOneValue(allMustSucceed, index, getDone(future)); - } - } catch (ExecutionException e) { - handleException(e.getCause()); - } catch (Throwable t) { - handleException(t); - } + @Override + final void addInitialException(Set seen) { + checkNotNull(seen); + if (!isCancelled()) { + // TODO(cpovirk): Think about whether we could/should use Verify to check this. + boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); } + } - private void decrementCountAndMaybeComplete() { - int newRemaining = decrementRemainingAndGet(); - checkState(newRemaining >= 0, "Less than 0 remaining futures"); - if (newRemaining == 0) { - processCompleted(); - } + /** + * Collects the result (success or failure) of one input future. The input must not have been + * cancelled. For details on when this is called, see {@link #collectOneValue}. + */ + private void collectValueFromNonCancelledFuture(int index, Future future) { + try { + // We get the result, even if collectOneValue is a no-op, so that we can fail fast. + collectOneValue(index, getDone(future)); + } catch (ExecutionException e) { + handleException(e.getCause()); + } catch (Throwable t) { + handleException(t); } + } + + private void decrementCountAndMaybeComplete( + @Nullable + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + int newRemaining = decrementRemainingAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + processCompleted(futuresIfNeedToCollectAtCompletion); + } + } - private void processCompleted() { - // Collect the values if (a) our output requires collecting them and (b) we haven't been - // collecting them as we go. (We've collected them as we go only if we needed to fail fast) - if (collectsValues & !allMustSucceed) { - int i = 0; - for (ListenableFuture listenable : futures) { - handleOneInputDone(i++, listenable); + private void processCompleted( + @Nullable + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + if (futuresIfNeedToCollectAtCompletion != null) { + int i = 0; + for (Future future : futuresIfNeedToCollectAtCompletion) { + if (!future.isCancelled()) { + collectValueFromNonCancelledFuture(i, future); } + i++; } - handleAllCompleted(); } - - /** - * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we - * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we - * immediately release resources we no longer need); additionally, the future will release its - * reference to {@link RunningState}, which should free all associated memory when all the - * futures complete and the listeners are released. - * - *

TODO(user): Write tests for memory retention + clearSeenExceptions(); + handleAllCompleted(); + /* + * Null out fields, including some used in handleAllCompleted() above (like + * `CollectionFuture.values`). This might be a no-op: If this future completed during + * handleAllCompleted(), they will already have been nulled out. But in the case of + * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in + * the case of callAsync(), which waits for the callback's returned future to complete. */ - @ForOverride - @OverridingMethodsMustInvokeSuper - void releaseResourcesAfterFailure() { - this.futures = null; - } + releaseResources(ALL_INPUT_FUTURES_PROCESSED); + } - /** - * Called only if {@code collectsValues} is true. - * - *

If {@code allMustSucceed} is true, called as each future completes; otherwise, called for - * each future when all futures complete. + /** + * Clears fields that are no longer needed after this future has completed -- or at least all its + * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). + * Often called multiple times (that is, both when the inputs complete and when the output + * completes). + * + *

This is similar to our proposed {@code afterCommit} method but not quite the same. See the + * description of CL 265462958. + */ + // TODO(user): Write more tests for memory retention. + @ForOverride + @OverridingMethodsMustInvokeSuper + void releaseResources(ReleaseResourcesReason reason) { + checkNotNull(reason); + /* + * All elements of `futures` are completed, or this future has already completed and read + * `futures` into a local variable (in preparation for propagating cancellation to them). In + * either case, no one needs to read `futures` for cancellation purposes later. (And + * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) */ - abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue); - - abstract void handleAllCompleted(); + this.futures = null; + } - void interruptTask() {} + enum ReleaseResourcesReason { + OUTPUT_FUTURE_DONE, + ALL_INPUT_FUTURES_PROCESSED, } + /** + * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code + * collectsValues} is true, called for each future when all futures complete. + */ + abstract void collectOneValue(int index, @Nullable InputT returnValue); + + abstract void handleAllCompleted(); + /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ private static boolean addCausalChain(Set seen, Throwable t) { for (; t != null; t = t.getCause()) { diff --git a/guava/src/com/google/common/util/concurrent/AggregateFutureState.java b/guava/src/com/google/common/util/concurrent/AggregateFutureState.java index 040d81363c47..f8398d817eb2 100644 --- a/guava/src/com/google/common/util/concurrent/AggregateFutureState.java +++ b/guava/src/com/google/common/util/concurrent/AggregateFutureState.java @@ -37,9 +37,9 @@ */ @GwtCompatible(emulated = true) @ReflectionSupport(value = ReflectionSupport.Level.FULL) -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures - // & this future completes. Released when the future releases the reference to the running state + // have completed and we have processed them all. private volatile Set seenExceptions = null; private volatile int remaining; @@ -89,12 +89,27 @@ final Set getOrInitSeenExceptions() { * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception, * and wrongly believes that its exception is new (leading it to logging it when it shouldn't) * - * Our solution is for threads to CAS seenExceptions from null to a Set population with _the + * Our solution is for threads to CAS seenExceptions from null to a Set populated with _the * initial exception_, no matter which thread does the work. This ensures that seenExceptions * always contains not just the current thread's exception but also the initial thread's. */ Set seenExceptionsLocal = seenExceptions; if (seenExceptionsLocal == null) { + // TODO(cpovirk): Should we use a simpler (presumably cheaper) data structure? + /* + * Using weak references here could let us release exceptions earlier, but: + * + * 1. On Android, querying a WeakReference blocks if the GC is doing an otherwise-concurrent + * pass. + * + * 2. We would probably choose to compare exceptions using == instead of equals() (for + * consistency with how weak references are cleared). That's a behavior change -- arguably the + * removal of a feature. + * + * Fortunately, exceptions rarely contain references to expensive resources. + */ + + // seenExceptionsLocal = newConcurrentHashSet(); /* * Other handleException() callers may see this as soon as we publish it. We need to populate @@ -122,6 +137,10 @@ final int decrementRemainingAndGet() { return ATOMIC_HELPER.decrementAndGetRemainingCount(this); } + final void clearSeenExceptions() { + seenExceptions = null; + } + private abstract static class AtomicHelper { /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */ abstract void compareAndSetSeenExceptions( @@ -169,8 +188,7 @@ void compareAndSetSeenExceptions( @Override int decrementAndGetRemainingCount(AggregateFutureState state) { synchronized (state) { - state.remaining--; - return state.remaining; + return --state.remaining; } } } diff --git a/guava/src/com/google/common/util/concurrent/CollectionFuture.java b/guava/src/com/google/common/util/concurrent/CollectionFuture.java index b1412d8dc0c7..e2ddf5ddb8e0 100644 --- a/guava/src/com/google/common/util/concurrent/CollectionFuture.java +++ b/guava/src/com/google/common/util/concurrent/CollectionFuture.java @@ -14,7 +14,6 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayListWithCapacity; import static java.util.Collections.unmodifiableList; @@ -29,83 +28,64 @@ /** Aggregate future that collects (stores) results of each future. */ @GwtCompatible(emulated = true) abstract class CollectionFuture extends AggregateFuture { + private List> values; - abstract class CollectionFutureRunningState extends RunningState { - private List> values; + CollectionFuture( + ImmutableCollection> futures, + boolean allMustSucceed) { + super(futures, allMustSucceed, true); - CollectionFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed, true); - - this.values = - futures.isEmpty() - ? ImmutableList.>of() - : Lists.>newArrayListWithCapacity(futures.size()); + this.values = + futures.isEmpty() + ? ImmutableList.>of() + : Lists.>newArrayListWithCapacity(futures.size()); - // Populate the results list with null initially. - for (int i = 0; i < futures.size(); ++i) { - values.add(null); - } + // Populate the results list with null initially. + for (int i = 0; i < futures.size(); ++i) { + values.add(null); } + } - @Override - final void collectOneValue(boolean allMustSucceed, int index, @Nullable V returnValue) { - List> localValues = values; - - if (localValues != null) { - localValues.set(index, Optional.fromNullable(returnValue)); - } else { - // Some other future failed or has been cancelled, causing this one to also be cancelled or - // have an exception set. This should only happen if allMustSucceed is true or if the output - // itself has been cancelled. - checkState( - allMustSucceed || isCancelled(), "Future was done before all dependencies completed"); - } - } - - @Override - final void handleAllCompleted() { - List> localValues = values; - if (localValues != null) { - set(combine(localValues)); - } else { - checkState(isDone()); - } + @Override + final void collectOneValue(int index, @Nullable V returnValue) { + List> localValues = values; + if (localValues != null) { + localValues.set(index, Optional.fromNullable(returnValue)); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); - this.values = null; + @Override + final void handleAllCompleted() { + List> localValues = values; + if (localValues != null) { + set(combine(localValues)); } + } - abstract C combine(List> values); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + this.values = null; } + abstract C combine(List> values); + /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */ static final class ListFuture extends CollectionFuture> { ListFuture( ImmutableCollection> futures, boolean allMustSucceed) { - init(new ListFutureRunningState(futures, allMustSucceed)); + super(futures, allMustSucceed); + init(); } - private final class ListFutureRunningState extends CollectionFutureRunningState { - ListFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed); - } - - @Override - public List combine(List> values) { - List result = newArrayListWithCapacity(values.size()); - for (Optional element : values) { - result.add(element != null ? element.orNull() : null); - } - return unmodifiableList(result); + @Override + public List combine(List> values) { + List result = newArrayListWithCapacity(values.size()); + for (Optional element : values) { + result.add(element != null ? element.orNull() : null); } + return unmodifiableList(result); } } } diff --git a/guava/src/com/google/common/util/concurrent/CombinedFuture.java b/guava/src/com/google/common/util/concurrent/CombinedFuture.java index c50fc0bc7753..d9e433437f10 100644 --- a/guava/src/com/google/common/util/concurrent/CombinedFuture.java +++ b/guava/src/com/google/common/util/concurrent/CombinedFuture.java @@ -15,7 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -30,16 +30,16 @@ /** Aggregate future that computes its value by calling a callable. */ @GwtCompatible final class CombinedFuture extends AggregateFuture { + private CombinedFutureInterruptibleTask task; + CombinedFuture( ImmutableCollection> futures, boolean allMustSucceed, Executor listenerExecutor, AsyncCallable callable) { - init( - new CombinedFutureRunningState( - futures, - allMustSucceed, - new AsyncCallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor); + init(); } CombinedFuture( @@ -47,47 +47,42 @@ final class CombinedFuture extends AggregateFuture { boolean allMustSucceed, Executor listenerExecutor, Callable callable) { - init( - new CombinedFutureRunningState( - futures, allMustSucceed, new CallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new CallableInterruptibleTask(callable, listenerExecutor); + init(); } - private final class CombinedFutureRunningState extends RunningState { - private CombinedFutureInterruptibleTask task; - - CombinedFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - CombinedFutureInterruptibleTask task) { - super(futures, allMustSucceed, false); - this.task = task; - } + @Override + void collectOneValue(int index, @Nullable Object returnValue) {} - @Override - void collectOneValue(boolean allMustSucceed, int index, @Nullable Object returnValue) {} - - @Override - void handleAllCompleted() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.execute(); - } else { - checkState(isDone()); - } + @Override + void handleAllCompleted() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.execute(); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + /* + * If the output future is done, then it won't need to interrupt the task later, so it can clear + * its reference to it. + * + * If the output future is *not* done, then the task field will be cleared after the task runs + * or after the output future is done, whichever comes first. + */ + if (reason == OUTPUT_FUTURE_DONE) { this.task = null; } + } - @Override - void interruptTask() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.interruptTask(); - } + @Override + protected void interruptTask() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.interruptTask(); } } @@ -96,7 +91,7 @@ private abstract class CombinedFutureInterruptibleTask extends InterruptibleT private final Executor listenerExecutor; boolean thrownByExecute = true; - public CombinedFutureInterruptibleTask(Executor listenerExecutor) { + CombinedFutureInterruptibleTask(Executor listenerExecutor) { this.listenerExecutor = checkNotNull(listenerExecutor); } @@ -117,6 +112,19 @@ final void execute() { @Override final void afterRanInterruptibly(T result, Throwable error) { + /* + * The future no longer needs to interrupt this task, so it no longer needs a reference to it. + * + * TODO(cpovirk): It might be nice for our InterruptibleTask subclasses to null out their + * `callable` fields automatically. That would make it less important for us to null out the + * reference to `task` here (though it's still nice to do so in case our reference to the + * executor keeps it alive). Ideally, nulling out `callable` would be the responsibility of + * InterruptibleTask itself so that its other subclasses also benefit. (Handling `callable` in + * InterruptibleTask itself might also eliminate some of the existing boilerplate for, e.g., + * pendingToString().) + */ + CombinedFuture.this.task = null; + if (error != null) { if (error instanceof ExecutionException) { setException(error.getCause()); @@ -138,7 +146,7 @@ private final class AsyncCallableInterruptibleTask extends CombinedFutureInterruptibleTask> { private final AsyncCallable callable; - public AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { + AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } @@ -157,9 +165,6 @@ ListenableFuture runInterruptibly() throws Exception { @Override void setValue(ListenableFuture value) { setFuture(value); - // Eagerly release resources instead of waiting for afterDone. We are done with the inputs, - // but the actual future may not complete for arbitrarily long. - releaseResources(); } @Override @@ -172,7 +177,7 @@ String toPendingString() { private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask { private final Callable callable; - public CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { + CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); }