diff --git a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 089fa1dc6676..dc319016bf2f 100644 --- a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -46,6 +46,7 @@ import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -2803,33 +2804,82 @@ public ListenableFuture call() throws Exception { @AndroidIncompatible @GwtIncompatible - public void testWhenAllSucceed_releasesMemory() throws Exception { + public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); - AsyncCallable combiner = - new AsyncCallable() { + Callable combiner = + new Callable() { @Override - public ListenableFuture call() throws Exception { - return SettableFuture.create(); + public Long call() { + throw new AssertionError(); } }; ListenableFuture unused = - whenAllSucceed(future1, future2).callAsync(combiner, directExecutor()); + whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); future1.set(1L); future1 = null; future2.set(2L); future2 = null; - // Futures should be collected even if combiner future never finishes. + /* + * Futures should be collected even if combiner never runs. This is kind of a silly test, since + * the combiner is almost certain to hold its own reference to the futures, and a real app would + * hold a reference to the executor and thus to the combiner. What we really care about is that + * the futures are released once the combiner is done running. But we happen to provide this + * earlier cleanup at the moment, so we're testing it. + */ GcFinalization.awaitClear(future1Ref); GcFinalization.awaitClear(future2Ref); } + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { + SettableFuture future = SettableFuture.create(); + WeakReference> futureRef = new WeakReference<>(future); + + Callable combiner = + new Callable() { + @Override + public Long call() { + throw new AssertionError(); + } + }; + + ListenableFuture unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); + + unused.cancel(false); + future = null; + + // Future should be collected because whenAll*Complete* doesn't need to look at its result. + GcFinalization.awaitClear(futureRef); + } + + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllSucceed_releasesCallable() throws Exception { + AsyncCallable combiner = + new AsyncCallable() { + @Override + public ListenableFuture call() { + return SettableFuture.create(); + } + }; + WeakReference> combinerRef = new WeakReference<>(combiner); + + ListenableFuture unused = + whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); + + combiner = null; + // combiner should be collected even if the future it returns never completes. + GcFinalization.awaitClear(combinerRef); + } + /* * TODO(cpovirk): maybe pass around TestFuture instances instead of * ListenableFuture instances @@ -3463,6 +3513,24 @@ public void testSuccessfulAsList_logging_error() throws Exception { assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } + public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + ListenableFuture input = new CancelPanickingFuture<>(); + ListenableFuture> output = successfulAsList(input); + output.cancel(false); + + List logged = aggregateFutureLogHandler.getStoredLogRecords(); + assertThat(logged).hasSize(1); + assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); + } + + private static final class CancelPanickingFuture extends AbstractFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + setException(new Error("You can't fire me, I quit.")); + return false; + } + } + public void testNonCancellationPropagating_successful() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); @@ -3517,22 +3585,6 @@ private static class TestException extends Exception { } } - @GwtIncompatible // used only in GwtIncompatible tests - private static final Function mapper = - new Function() { - @Override - public TestException apply(Exception from) { - if (from instanceof ExecutionException) { - return new TestException(from.getCause()); - } else { - assertTrue( - "got " + from.getClass(), - from instanceof InterruptedException || from instanceof CancellationException); - return new TestException(from); - } - } - }; - @GwtIncompatible // used only in GwtIncompatible tests private interface MapperFunction extends Function {} diff --git a/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java index 18e0e9e898b2..ae3fde8858c9 100644 --- a/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java @@ -16,8 +16,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.logging.Level.SEVERE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -26,270 +29,303 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.compatqual.NullableDecl; /** - * A future made up of a collection of sub-futures. + * A future whose value is derived from a collection of input futures. * * @param the type of the individual inputs * @param the type of the output (i.e. this) future */ @GwtCompatible -abstract class AggregateFuture extends AbstractFuture.TrustedFuture { +abstract class AggregateFuture extends AggregateFutureState { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); + /** + * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to + * propagate cancellation) and {@link #toString()}. To access the futures' values, {@code + * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case + * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. + */ /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - @NullableDecl private RunningState runningState; + @NullableDecl private ImmutableCollection> futures; + + private final boolean allMustSucceed; + private final boolean collectsValues; + + AggregateFuture( + ImmutableCollection> futures, + boolean allMustSucceed, + boolean collectsValues) { + super(futures.size()); + this.futures = checkNotNull(futures); + this.allMustSucceed = allMustSucceed; + this.collectsValues = collectsValues; + } @Override protected final void afterDone() { super.afterDone(); - releaseResources(); - } - - protected final void releaseResources() { - RunningState localRunningState = runningState; - if (localRunningState != null) { - // Let go of the memory held by the running state - this.runningState = null; - ImmutableCollection> futures = - localRunningState.futures; - boolean wasInterrupted = wasInterrupted(); - if (wasInterrupted) { - localRunningState.interruptTask(); - } + ImmutableCollection> localFutures = futures; + releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` - if (isCancelled() & futures != null) { - for (ListenableFuture future : futures) { - future.cancel(wasInterrupted); - } + if (isCancelled() & localFutures != null) { + boolean wasInterrupted = wasInterrupted(); + for (Future future : localFutures) { + future.cancel(wasInterrupted); } } + /* + * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed + * again if some outstanding input fails. + */ } @Override - protected String pendingToString() { - RunningState localRunningState = runningState; - if (localRunningState == null) { - return null; - } - ImmutableCollection> localFutures = - localRunningState.futures; + protected final String pendingToString() { + ImmutableCollection> localFutures = futures; if (localFutures != null) { return "futures=[" + localFutures + "]"; } return null; } - /** Must be called at the end of each sub-class's constructor. */ - final void init(RunningState runningState) { - this.runningState = runningState; - runningState.init(); - } - - abstract class RunningState extends AggregateFutureState implements Runnable { - private ImmutableCollection> futures; - private final boolean allMustSucceed; - private final boolean collectsValues; - - RunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - boolean collectsValues) { - super(futures.size()); - this.futures = checkNotNull(futures); - this.allMustSucceed = allMustSucceed; - this.collectsValues = collectsValues; - } - - /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ - @Override - public final void run() { - decrementCountAndMaybeComplete(); + /** + * Must be called at the end of each subclass's constructor. This method performs the "real" + * initialization; we can't put this in the constructor because, in the case where futures are + * already complete, we would not initialize the subclass before calling {@link + * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, + * we're guaranteed to have properly initialized the subclass. + */ + final void init() { + // Corner case: List is empty. + if (futures.isEmpty()) { + handleAllCompleted(); + return; } - /** - * The "real" initialization; we can't put this in the constructor because, in the case where - * futures are already complete, we would not initialize the subclass before calling {@link - * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed - * to have properly initialized the subclass. - */ - private void init() { - // Corner case: List is empty. - if (futures.isEmpty()) { - handleAllCompleted(); - return; - } - - // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll - // need to handle RejectedExecutionException + // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll + // need to handle RejectedExecutionException - if (allMustSucceed) { - // We need fail fast, so we have to keep track of which future failed so we can propagate - // the exception immediately + if (allMustSucceed) { + // We need fail fast, so we have to keep track of which future failed so we can propagate + // the exception immediately - // Register a listener on each Future in the list to update the state of this future. - // Note that if all the futures on the list are done prior to completing this loop, the last - // call to addListener() will callback to setOneValue(), transitively call our cleanup - // listener, and set this.futures to null. - // This is not actually a problem, since the foreach only needs this.futures to be non-null - // at the beginning of the loop. - int i = 0; - for (final ListenableFuture listenable : futures) { - final int index = i++; - listenable.addListener( - new Runnable() { - @Override - public void run() { - try { - handleOneInputDone(index, listenable); - } finally { - decrementCountAndMaybeComplete(); + // Register a listener on each Future in the list to update the state of this future. + // Note that if all the futures on the list are done prior to completing this loop, the last + // call to addListener() will callback to setOneValue(), transitively call our cleanup + // listener, and set this.futures to null. + // This is not actually a problem, since the foreach only needs this.futures to be non-null + // at the beginning of the loop. + int i = 0; + for (final ListenableFuture future : futures) { + final int index = i++; + future.addListener( + new Runnable() { + @Override + public void run() { + try { + if (future.isCancelled()) { + // Clear futures prior to cancelling children. This sets our own state but lets + // the input futures keep running, as some of them may be used elsewhere. + futures = null; + cancel(false); + } else { + collectValueFromNonCancelledFuture(index, future); } + } finally { + /* + * "null" means: There is no need to access `futures` again during + * `processCompleted` because we're reading each value during a call to + * handleOneInputDone. + */ + decrementCountAndMaybeComplete(null); } - }, - directExecutor()); - } - } else { - // We'll only call the callback when all futures complete, regardless of whether some failed - // Hold off on calling setOneValue until all complete, so we can share the same listener - for (ListenableFuture listenable : futures) { - listenable.addListener(this, directExecutor()); - } + } + }, + directExecutor()); + } + } else { + /* + * We'll call the user callback or collect the values only when all inputs complete, + * regardless of whether some failed. This lets us avoid calling expensive methods like + * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it + * lets all futures share the same listener. + * + * We store `localFutures` inside the listener because `this.futures` might be nulled out by + * the time the listener runs for the final future -- at which point we need to check all + * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't + * need access to the futures again, so we can just pass `null`. + * + * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. + * If we make some other optimizations, this one will no longer be necessary. The optimization + * could actually hurt in some cases, as it forces us to keep all inputs in memory until the + * final input completes. + */ + final ImmutableCollection> localFutures = + collectsValues ? futures : null; + Runnable listener = + new Runnable() { + @Override + public void run() { + decrementCountAndMaybeComplete(localFutures); + } + }; + for (ListenableFuture future : futures) { + future.addListener(listener, directExecutor()); } } + } - /** - * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the - * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the - * throwable did not cause this future to fail, and it is the first time we've seen that - * particular Throwable. - */ - private void handleException(Throwable throwable) { - checkNotNull(throwable); + /** + * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the + * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the + * throwable did not cause this future to fail, and it is the first time we've seen that + * particular Throwable. + */ + private void handleException(Throwable throwable) { + checkNotNull(throwable); - boolean completedWithFailure = false; - boolean firstTimeSeeingThisException = true; - if (allMustSucceed) { - // As soon as the first one fails, throw the exception up. - // The result of all other inputs is then ignored. - completedWithFailure = setException(throwable); - if (completedWithFailure) { - releaseResourcesAfterFailure(); - } else { - // Go up the causal chain to see if we've already seen this cause; if we have, even if - // it's wrapped by a different exception, don't log it. - firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (allMustSucceed) { + // As soon as the first one fails, make that failure the result of the output future. + // The results of all other inputs are then ignored (except for logging any failures). + boolean completedWithFailure = setException(throwable); + if (!completedWithFailure) { + // Go up the causal chain to see if we've already seen this cause; if we have, even if + // it's wrapped by a different exception, don't log it. + boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (firstTimeSeeingThisException) { + log(throwable); + return; } } - - // | and & used because it's faster than the branch required for || and && - if (throwable instanceof Error - | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { - String message = - (throwable instanceof Error) - ? "Input Future failed with Error" - : "Got more than one input Future failure. Logging failures after the first"; - logger.log(Level.SEVERE, message, throwable); - } } - @Override - final void addInitialException(Set seen) { - if (!isCancelled()) { - // TODO(cpovirk): Think about whether we could/should use Verify to check this. - boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); - } + /* + * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call + * handleException() at all. + */ + if (throwable instanceof Error) { + /* + * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it + * returned true? This was intentional (CL 46470009), but it seems odd compared to how we + * normally handle Error. + * + * Similarly, do we really want to log the same Error more than once? + */ + log(throwable); } + } - /** Handles the input at the given index completing. */ - private void handleOneInputDone(int index, Future future) { - // The only cases in which this Future should already be done are (a) if it was cancelled or - // (b) if an input failed and we propagated that immediately because of allMustSucceed. - checkState( - allMustSucceed || !isDone() || isCancelled(), - "Future was done before all dependencies completed"); + private static void log(Throwable throwable) { + String message = + (throwable instanceof Error) + ? "Input Future failed with Error" + : "Got more than one input Future failure. Logging failures after the first"; + logger.log(SEVERE, message, throwable); + } - try { - checkState(future.isDone(), "Tried to set value from future which is not done"); - if (allMustSucceed) { - if (future.isCancelled()) { - // clear running state prior to cancelling children, this sets our own state but lets - // the input futures keep running as some of them may be used elsewhere. - runningState = null; - cancel(false); - } else { - // We always get the result so that we can have fail-fast, even if we don't collect - InputT result = getDone(future); - if (collectsValues) { - collectOneValue(allMustSucceed, index, result); - } - } - } else if (collectsValues && !future.isCancelled()) { - collectOneValue(allMustSucceed, index, getDone(future)); - } - } catch (ExecutionException e) { - handleException(e.getCause()); - } catch (Throwable t) { - handleException(t); - } + @Override + final void addInitialException(Set seen) { + checkNotNull(seen); + if (!isCancelled()) { + // TODO(cpovirk): Think about whether we could/should use Verify to check this. + boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); } + } - private void decrementCountAndMaybeComplete() { - int newRemaining = decrementRemainingAndGet(); - checkState(newRemaining >= 0, "Less than 0 remaining futures"); - if (newRemaining == 0) { - processCompleted(); - } + /** + * Collects the result (success or failure) of one input future. The input must not have been + * cancelled. For details on when this is called, see {@link #collectOneValue}. + */ + private void collectValueFromNonCancelledFuture(int index, Future future) { + try { + // We get the result, even if collectOneValue is a no-op, so that we can fail fast. + collectOneValue(index, getDone(future)); + } catch (ExecutionException e) { + handleException(e.getCause()); + } catch (Throwable t) { + handleException(t); } + } + + private void decrementCountAndMaybeComplete( + @NullableDecl + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + int newRemaining = decrementRemainingAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + processCompleted(futuresIfNeedToCollectAtCompletion); + } + } - private void processCompleted() { - // Collect the values if (a) our output requires collecting them and (b) we haven't been - // collecting them as we go. (We've collected them as we go only if we needed to fail fast) - if (collectsValues & !allMustSucceed) { - int i = 0; - for (ListenableFuture listenable : futures) { - handleOneInputDone(i++, listenable); + private void processCompleted( + @NullableDecl + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + if (futuresIfNeedToCollectAtCompletion != null) { + int i = 0; + for (Future future : futuresIfNeedToCollectAtCompletion) { + if (!future.isCancelled()) { + collectValueFromNonCancelledFuture(i, future); } + i++; } - handleAllCompleted(); } - - /** - * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we - * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we - * immediately release resources we no longer need); additionally, the future will release its - * reference to {@link RunningState}, which should free all associated memory when all the - * futures complete and the listeners are released. - * - *

TODO(user): Write tests for memory retention + clearSeenExceptions(); + handleAllCompleted(); + /* + * Null out fields, including some used in handleAllCompleted() above (like + * `CollectionFuture.values`). This might be a no-op: If this future completed during + * handleAllCompleted(), they will already have been nulled out. But in the case of + * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in + * the case of callAsync(), which waits for the callback's returned future to complete. */ - @ForOverride - @OverridingMethodsMustInvokeSuper - void releaseResourcesAfterFailure() { - this.futures = null; - } + releaseResources(ALL_INPUT_FUTURES_PROCESSED); + } - /** - * Called only if {@code collectsValues} is true. - * - *

If {@code allMustSucceed} is true, called as each future completes; otherwise, called for - * each future when all futures complete. + /** + * Clears fields that are no longer needed after this future has completed -- or at least all its + * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). + * Often called multiple times (that is, both when the inputs complete and when the output + * completes). + * + *

This is similar to our proposed {@code afterCommit} method but not quite the same. See the + * description of CL 265462958. + */ + // TODO(user): Write more tests for memory retention. + @ForOverride + @OverridingMethodsMustInvokeSuper + void releaseResources(ReleaseResourcesReason reason) { + checkNotNull(reason); + /* + * All elements of `futures` are completed, or this future has already completed and read + * `futures` into a local variable (in preparation for propagating cancellation to them). In + * either case, no one needs to read `futures` for cancellation purposes later. (And + * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) */ - abstract void collectOneValue( - boolean allMustSucceed, int index, @NullableDecl InputT returnValue); - - abstract void handleAllCompleted(); + this.futures = null; + } - void interruptTask() {} + enum ReleaseResourcesReason { + OUTPUT_FUTURE_DONE, + ALL_INPUT_FUTURES_PROCESSED, } + /** + * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code + * collectsValues} is true, called for each future when all futures complete. + */ + abstract void collectOneValue(int index, @NullableDecl InputT returnValue); + + abstract void handleAllCompleted(); + /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ private static boolean addCausalChain(Set seen, Throwable t) { for (; t != null; t = t.getCause()) { diff --git a/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java b/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java index 040d81363c47..f8398d817eb2 100644 --- a/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java +++ b/android/guava/src/com/google/common/util/concurrent/AggregateFutureState.java @@ -37,9 +37,9 @@ */ @GwtCompatible(emulated = true) @ReflectionSupport(value = ReflectionSupport.Level.FULL) -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures - // & this future completes. Released when the future releases the reference to the running state + // have completed and we have processed them all. private volatile Set seenExceptions = null; private volatile int remaining; @@ -89,12 +89,27 @@ final Set getOrInitSeenExceptions() { * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception, * and wrongly believes that its exception is new (leading it to logging it when it shouldn't) * - * Our solution is for threads to CAS seenExceptions from null to a Set population with _the + * Our solution is for threads to CAS seenExceptions from null to a Set populated with _the * initial exception_, no matter which thread does the work. This ensures that seenExceptions * always contains not just the current thread's exception but also the initial thread's. */ Set seenExceptionsLocal = seenExceptions; if (seenExceptionsLocal == null) { + // TODO(cpovirk): Should we use a simpler (presumably cheaper) data structure? + /* + * Using weak references here could let us release exceptions earlier, but: + * + * 1. On Android, querying a WeakReference blocks if the GC is doing an otherwise-concurrent + * pass. + * + * 2. We would probably choose to compare exceptions using == instead of equals() (for + * consistency with how weak references are cleared). That's a behavior change -- arguably the + * removal of a feature. + * + * Fortunately, exceptions rarely contain references to expensive resources. + */ + + // seenExceptionsLocal = newConcurrentHashSet(); /* * Other handleException() callers may see this as soon as we publish it. We need to populate @@ -122,6 +137,10 @@ final int decrementRemainingAndGet() { return ATOMIC_HELPER.decrementAndGetRemainingCount(this); } + final void clearSeenExceptions() { + seenExceptions = null; + } + private abstract static class AtomicHelper { /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */ abstract void compareAndSetSeenExceptions( @@ -169,8 +188,7 @@ void compareAndSetSeenExceptions( @Override int decrementAndGetRemainingCount(AggregateFutureState state) { synchronized (state) { - state.remaining--; - return state.remaining; + return --state.remaining; } } } diff --git a/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java b/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java index 97476d59b2d1..e2d2f2ffd9d1 100644 --- a/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/CollectionFuture.java @@ -14,7 +14,6 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayListWithCapacity; import static java.util.Collections.unmodifiableList; @@ -29,83 +28,64 @@ /** Aggregate future that collects (stores) results of each future. */ @GwtCompatible(emulated = true) abstract class CollectionFuture extends AggregateFuture { + private List> values; - abstract class CollectionFutureRunningState extends RunningState { - private List> values; + CollectionFuture( + ImmutableCollection> futures, + boolean allMustSucceed) { + super(futures, allMustSucceed, true); - CollectionFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed, true); - - this.values = - futures.isEmpty() - ? ImmutableList.>of() - : Lists.>newArrayListWithCapacity(futures.size()); + this.values = + futures.isEmpty() + ? ImmutableList.>of() + : Lists.>newArrayListWithCapacity(futures.size()); - // Populate the results list with null initially. - for (int i = 0; i < futures.size(); ++i) { - values.add(null); - } + // Populate the results list with null initially. + for (int i = 0; i < futures.size(); ++i) { + values.add(null); } + } - @Override - final void collectOneValue(boolean allMustSucceed, int index, @NullableDecl V returnValue) { - List> localValues = values; - - if (localValues != null) { - localValues.set(index, Optional.fromNullable(returnValue)); - } else { - // Some other future failed or has been cancelled, causing this one to also be cancelled or - // have an exception set. This should only happen if allMustSucceed is true or if the output - // itself has been cancelled. - checkState( - allMustSucceed || isCancelled(), "Future was done before all dependencies completed"); - } - } - - @Override - final void handleAllCompleted() { - List> localValues = values; - if (localValues != null) { - set(combine(localValues)); - } else { - checkState(isDone()); - } + @Override + final void collectOneValue(int index, @NullableDecl V returnValue) { + List> localValues = values; + if (localValues != null) { + localValues.set(index, Optional.fromNullable(returnValue)); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); - this.values = null; + @Override + final void handleAllCompleted() { + List> localValues = values; + if (localValues != null) { + set(combine(localValues)); } + } - abstract C combine(List> values); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + this.values = null; } + abstract C combine(List> values); + /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */ static final class ListFuture extends CollectionFuture> { ListFuture( ImmutableCollection> futures, boolean allMustSucceed) { - init(new ListFutureRunningState(futures, allMustSucceed)); + super(futures, allMustSucceed); + init(); } - private final class ListFutureRunningState extends CollectionFutureRunningState { - ListFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed); - } - - @Override - public List combine(List> values) { - List result = newArrayListWithCapacity(values.size()); - for (Optional element : values) { - result.add(element != null ? element.orNull() : null); - } - return unmodifiableList(result); + @Override + public List combine(List> values) { + List result = newArrayListWithCapacity(values.size()); + for (Optional element : values) { + result.add(element != null ? element.orNull() : null); } + return unmodifiableList(result); } } } diff --git a/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java b/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java index ded4b77b214f..38e939476214 100644 --- a/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java +++ b/android/guava/src/com/google/common/util/concurrent/CombinedFuture.java @@ -15,7 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -30,16 +30,16 @@ /** Aggregate future that computes its value by calling a callable. */ @GwtCompatible final class CombinedFuture extends AggregateFuture { + private CombinedFutureInterruptibleTask task; + CombinedFuture( ImmutableCollection> futures, boolean allMustSucceed, Executor listenerExecutor, AsyncCallable callable) { - init( - new CombinedFutureRunningState( - futures, - allMustSucceed, - new AsyncCallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor); + init(); } CombinedFuture( @@ -47,47 +47,42 @@ final class CombinedFuture extends AggregateFuture { boolean allMustSucceed, Executor listenerExecutor, Callable callable) { - init( - new CombinedFutureRunningState( - futures, allMustSucceed, new CallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new CallableInterruptibleTask(callable, listenerExecutor); + init(); } - private final class CombinedFutureRunningState extends RunningState { - private CombinedFutureInterruptibleTask task; - - CombinedFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - CombinedFutureInterruptibleTask task) { - super(futures, allMustSucceed, false); - this.task = task; - } + @Override + void collectOneValue(int index, @NullableDecl Object returnValue) {} - @Override - void collectOneValue(boolean allMustSucceed, int index, @NullableDecl Object returnValue) {} - - @Override - void handleAllCompleted() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.execute(); - } else { - checkState(isDone()); - } + @Override + void handleAllCompleted() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.execute(); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + /* + * If the output future is done, then it won't need to interrupt the task later, so it can clear + * its reference to it. + * + * If the output future is *not* done, then the task field will be cleared after the task runs + * or after the output future is done, whichever comes first. + */ + if (reason == OUTPUT_FUTURE_DONE) { this.task = null; } + } - @Override - void interruptTask() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.interruptTask(); - } + @Override + protected void interruptTask() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.interruptTask(); } } @@ -96,7 +91,7 @@ private abstract class CombinedFutureInterruptibleTask extends InterruptibleT private final Executor listenerExecutor; boolean thrownByExecute = true; - public CombinedFutureInterruptibleTask(Executor listenerExecutor) { + CombinedFutureInterruptibleTask(Executor listenerExecutor) { this.listenerExecutor = checkNotNull(listenerExecutor); } @@ -117,6 +112,19 @@ final void execute() { @Override final void afterRanInterruptibly(T result, Throwable error) { + /* + * The future no longer needs to interrupt this task, so it no longer needs a reference to it. + * + * TODO(cpovirk): It might be nice for our InterruptibleTask subclasses to null out their + * `callable` fields automatically. That would make it less important for us to null out the + * reference to `task` here (though it's still nice to do so in case our reference to the + * executor keeps it alive). Ideally, nulling out `callable` would be the responsibility of + * InterruptibleTask itself so that its other subclasses also benefit. (Handling `callable` in + * InterruptibleTask itself might also eliminate some of the existing boilerplate for, e.g., + * pendingToString().) + */ + CombinedFuture.this.task = null; + if (error != null) { if (error instanceof ExecutionException) { setException(error.getCause()); @@ -138,7 +146,7 @@ private final class AsyncCallableInterruptibleTask extends CombinedFutureInterruptibleTask> { private final AsyncCallable callable; - public AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { + AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } @@ -157,9 +165,6 @@ ListenableFuture runInterruptibly() throws Exception { @Override void setValue(ListenableFuture value) { setFuture(value); - // Eagerly release resources instead of waiting for afterDone. We are done with the inputs, - // but the actual future may not complete for arbitrarily long. - releaseResources(); } @Override @@ -172,7 +177,7 @@ String toPendingString() { private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask { private final Callable callable; - public CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { + CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } diff --git a/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java b/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java index 91c603b90e0a..0058c34c03dc 100644 --- a/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java +++ b/guava-gwt/src-super/com/google/common/util/concurrent/super/com/google/common/util/concurrent/AggregateFutureState.java @@ -21,7 +21,7 @@ import java.util.Set; /** Emulation of AggregateFutureState. */ -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures // & this future completes. Released when the future releases the reference to the running state private Set seenExceptions = null; @@ -44,4 +44,8 @@ final Set getOrInitSeenExceptions() { final int decrementRemainingAndGet() { return --remaining; } + + final void clearSeenExceptions() { + seenExceptions = null; + } } diff --git a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java index dacbfa1b07f2..427eed9f70ca 100644 --- a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java +++ b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java @@ -2016,6 +2016,33 @@ public void testSuccessfulAsList_emptyList() throws Exception { } } +public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); + testCase.setUp(); + Throwable failure = null; + try { + testCase.testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled(); + } catch (Throwable t) { + failure = t; + } + try { + testCase.tearDown(); + } catch (Throwable t) { + if (failure == null) { + failure = t; + } + } + if (failure instanceof Exception) { + throw (Exception) failure; + } + if (failure instanceof Error) { + throw (Error) failure; + } + if (failure != null) { + throw new RuntimeException(failure); + } +} + public void testSuccessfulAsList_logging_error() throws Exception { com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); testCase.setUp(); diff --git a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 571bed062dde..1eef3d6505c2 100644 --- a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -46,6 +46,7 @@ import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -2803,33 +2804,82 @@ public ListenableFuture call() throws Exception { @AndroidIncompatible @GwtIncompatible - public void testWhenAllSucceed_releasesMemory() throws Exception { + public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); - AsyncCallable combiner = - new AsyncCallable() { + Callable combiner = + new Callable() { @Override - public ListenableFuture call() throws Exception { - return SettableFuture.create(); + public Long call() { + throw new AssertionError(); } }; ListenableFuture unused = - whenAllSucceed(future1, future2).callAsync(combiner, directExecutor()); + whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); future1.set(1L); future1 = null; future2.set(2L); future2 = null; - // Futures should be collected even if combiner future never finishes. + /* + * Futures should be collected even if combiner never runs. This is kind of a silly test, since + * the combiner is almost certain to hold its own reference to the futures, and a real app would + * hold a reference to the executor and thus to the combiner. What we really care about is that + * the futures are released once the combiner is done running. But we happen to provide this + * earlier cleanup at the moment, so we're testing it. + */ GcFinalization.awaitClear(future1Ref); GcFinalization.awaitClear(future2Ref); } + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { + SettableFuture future = SettableFuture.create(); + WeakReference> futureRef = new WeakReference<>(future); + + Callable combiner = + new Callable() { + @Override + public Long call() { + throw new AssertionError(); + } + }; + + ListenableFuture unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); + + unused.cancel(false); + future = null; + + // Future should be collected because whenAll*Complete* doesn't need to look at its result. + GcFinalization.awaitClear(futureRef); + } + + @AndroidIncompatible + @GwtIncompatible + public void testWhenAllSucceed_releasesCallable() throws Exception { + AsyncCallable combiner = + new AsyncCallable() { + @Override + public ListenableFuture call() { + return SettableFuture.create(); + } + }; + WeakReference> combinerRef = new WeakReference<>(combiner); + + ListenableFuture unused = + whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); + + combiner = null; + // combiner should be collected even if the future it returns never completes. + GcFinalization.awaitClear(combinerRef); + } + /* * TODO(cpovirk): maybe pass around TestFuture instances instead of * ListenableFuture instances @@ -3463,6 +3513,24 @@ public void testSuccessfulAsList_logging_error() throws Exception { assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } + public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { + ListenableFuture input = new CancelPanickingFuture<>(); + ListenableFuture> output = successfulAsList(input); + output.cancel(false); + + List logged = aggregateFutureLogHandler.getStoredLogRecords(); + assertThat(logged).hasSize(1); + assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); + } + + private static final class CancelPanickingFuture extends AbstractFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + setException(new Error("You can't fire me, I quit.")); + return false; + } + } + public void testNonCancellationPropagating_successful() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); @@ -3517,22 +3585,6 @@ private static class TestException extends Exception { } } - @GwtIncompatible // used only in GwtIncompatible tests - private static final Function mapper = - new Function() { - @Override - public TestException apply(Exception from) { - if (from instanceof ExecutionException) { - return new TestException(from.getCause()); - } else { - assertTrue( - "got " + from.getClass(), - from instanceof InterruptedException || from instanceof CancellationException); - return new TestException(from); - } - } - }; - @GwtIncompatible // used only in GwtIncompatible tests private interface MapperFunction extends Function {} diff --git a/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/guava/src/com/google/common/util/concurrent/AggregateFuture.java index 90670271e9ea..c58abd1eb8e9 100644 --- a/guava/src/com/google/common/util/concurrent/AggregateFuture.java +++ b/guava/src/com/google/common/util/concurrent/AggregateFuture.java @@ -16,8 +16,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.logging.Level.SEVERE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -26,269 +29,303 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.Nullable; /** - * A future made up of a collection of sub-futures. + * A future whose value is derived from a collection of input futures. * * @param the type of the individual inputs * @param the type of the output (i.e. this) future */ @GwtCompatible -abstract class AggregateFuture extends AbstractFuture.TrustedFuture { +abstract class AggregateFuture extends AggregateFutureState { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); + /** + * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to + * propagate cancellation) and {@link #toString()}. To access the futures' values, {@code + * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case + * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. + */ /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - private @Nullable RunningState runningState; + private @Nullable ImmutableCollection> futures; + + private final boolean allMustSucceed; + private final boolean collectsValues; + + AggregateFuture( + ImmutableCollection> futures, + boolean allMustSucceed, + boolean collectsValues) { + super(futures.size()); + this.futures = checkNotNull(futures); + this.allMustSucceed = allMustSucceed; + this.collectsValues = collectsValues; + } @Override protected final void afterDone() { super.afterDone(); - releaseResources(); - } - - protected final void releaseResources() { - RunningState localRunningState = runningState; - if (localRunningState != null) { - // Let go of the memory held by the running state - this.runningState = null; - ImmutableCollection> futures = - localRunningState.futures; - boolean wasInterrupted = wasInterrupted(); - if (wasInterrupted) { - localRunningState.interruptTask(); - } + ImmutableCollection> localFutures = futures; + releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` - if (isCancelled() & futures != null) { - for (ListenableFuture future : futures) { - future.cancel(wasInterrupted); - } + if (isCancelled() & localFutures != null) { + boolean wasInterrupted = wasInterrupted(); + for (Future future : localFutures) { + future.cancel(wasInterrupted); } } + /* + * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed + * again if some outstanding input fails. + */ } @Override - protected String pendingToString() { - RunningState localRunningState = runningState; - if (localRunningState == null) { - return null; - } - ImmutableCollection> localFutures = - localRunningState.futures; + protected final String pendingToString() { + ImmutableCollection> localFutures = futures; if (localFutures != null) { return "futures=[" + localFutures + "]"; } return null; } - /** Must be called at the end of each sub-class's constructor. */ - final void init(RunningState runningState) { - this.runningState = runningState; - runningState.init(); - } - - abstract class RunningState extends AggregateFutureState implements Runnable { - private ImmutableCollection> futures; - private final boolean allMustSucceed; - private final boolean collectsValues; - - RunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - boolean collectsValues) { - super(futures.size()); - this.futures = checkNotNull(futures); - this.allMustSucceed = allMustSucceed; - this.collectsValues = collectsValues; - } - - /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ - @Override - public final void run() { - decrementCountAndMaybeComplete(); + /** + * Must be called at the end of each subclass's constructor. This method performs the "real" + * initialization; we can't put this in the constructor because, in the case where futures are + * already complete, we would not initialize the subclass before calling {@link + * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, + * we're guaranteed to have properly initialized the subclass. + */ + final void init() { + // Corner case: List is empty. + if (futures.isEmpty()) { + handleAllCompleted(); + return; } - /** - * The "real" initialization; we can't put this in the constructor because, in the case where - * futures are already complete, we would not initialize the subclass before calling {@link - * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed - * to have properly initialized the subclass. - */ - private void init() { - // Corner case: List is empty. - if (futures.isEmpty()) { - handleAllCompleted(); - return; - } - - // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll - // need to handle RejectedExecutionException + // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll + // need to handle RejectedExecutionException - if (allMustSucceed) { - // We need fail fast, so we have to keep track of which future failed so we can propagate - // the exception immediately + if (allMustSucceed) { + // We need fail fast, so we have to keep track of which future failed so we can propagate + // the exception immediately - // Register a listener on each Future in the list to update the state of this future. - // Note that if all the futures on the list are done prior to completing this loop, the last - // call to addListener() will callback to setOneValue(), transitively call our cleanup - // listener, and set this.futures to null. - // This is not actually a problem, since the foreach only needs this.futures to be non-null - // at the beginning of the loop. - int i = 0; - for (final ListenableFuture listenable : futures) { - final int index = i++; - listenable.addListener( - new Runnable() { - @Override - public void run() { - try { - handleOneInputDone(index, listenable); - } finally { - decrementCountAndMaybeComplete(); + // Register a listener on each Future in the list to update the state of this future. + // Note that if all the futures on the list are done prior to completing this loop, the last + // call to addListener() will callback to setOneValue(), transitively call our cleanup + // listener, and set this.futures to null. + // This is not actually a problem, since the foreach only needs this.futures to be non-null + // at the beginning of the loop. + int i = 0; + for (final ListenableFuture future : futures) { + final int index = i++; + future.addListener( + new Runnable() { + @Override + public void run() { + try { + if (future.isCancelled()) { + // Clear futures prior to cancelling children. This sets our own state but lets + // the input futures keep running, as some of them may be used elsewhere. + futures = null; + cancel(false); + } else { + collectValueFromNonCancelledFuture(index, future); } + } finally { + /* + * "null" means: There is no need to access `futures` again during + * `processCompleted` because we're reading each value during a call to + * handleOneInputDone. + */ + decrementCountAndMaybeComplete(null); } - }, - directExecutor()); - } - } else { - // We'll only call the callback when all futures complete, regardless of whether some failed - // Hold off on calling setOneValue until all complete, so we can share the same listener - for (ListenableFuture listenable : futures) { - listenable.addListener(this, directExecutor()); - } + } + }, + directExecutor()); + } + } else { + /* + * We'll call the user callback or collect the values only when all inputs complete, + * regardless of whether some failed. This lets us avoid calling expensive methods like + * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it + * lets all futures share the same listener. + * + * We store `localFutures` inside the listener because `this.futures` might be nulled out by + * the time the listener runs for the final future -- at which point we need to check all + * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't + * need access to the futures again, so we can just pass `null`. + * + * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. + * If we make some other optimizations, this one will no longer be necessary. The optimization + * could actually hurt in some cases, as it forces us to keep all inputs in memory until the + * final input completes. + */ + final ImmutableCollection> localFutures = + collectsValues ? futures : null; + Runnable listener = + new Runnable() { + @Override + public void run() { + decrementCountAndMaybeComplete(localFutures); + } + }; + for (ListenableFuture future : futures) { + future.addListener(listener, directExecutor()); } } + } - /** - * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the - * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the - * throwable did not cause this future to fail, and it is the first time we've seen that - * particular Throwable. - */ - private void handleException(Throwable throwable) { - checkNotNull(throwable); + /** + * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the + * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the + * throwable did not cause this future to fail, and it is the first time we've seen that + * particular Throwable. + */ + private void handleException(Throwable throwable) { + checkNotNull(throwable); - boolean completedWithFailure = false; - boolean firstTimeSeeingThisException = true; - if (allMustSucceed) { - // As soon as the first one fails, throw the exception up. - // The result of all other inputs is then ignored. - completedWithFailure = setException(throwable); - if (completedWithFailure) { - releaseResourcesAfterFailure(); - } else { - // Go up the causal chain to see if we've already seen this cause; if we have, even if - // it's wrapped by a different exception, don't log it. - firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (allMustSucceed) { + // As soon as the first one fails, make that failure the result of the output future. + // The results of all other inputs are then ignored (except for logging any failures). + boolean completedWithFailure = setException(throwable); + if (!completedWithFailure) { + // Go up the causal chain to see if we've already seen this cause; if we have, even if + // it's wrapped by a different exception, don't log it. + boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (firstTimeSeeingThisException) { + log(throwable); + return; } } - - // | and & used because it's faster than the branch required for || and && - if (throwable instanceof Error - | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { - String message = - (throwable instanceof Error) - ? "Input Future failed with Error" - : "Got more than one input Future failure. Logging failures after the first"; - logger.log(Level.SEVERE, message, throwable); - } } - @Override - final void addInitialException(Set seen) { - if (!isCancelled()) { - // TODO(cpovirk): Think about whether we could/should use Verify to check this. - boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); - } + /* + * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call + * handleException() at all. + */ + if (throwable instanceof Error) { + /* + * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it + * returned true? This was intentional (CL 46470009), but it seems odd compared to how we + * normally handle Error. + * + * Similarly, do we really want to log the same Error more than once? + */ + log(throwable); } + } - /** Handles the input at the given index completing. */ - private void handleOneInputDone(int index, Future future) { - // The only cases in which this Future should already be done are (a) if it was cancelled or - // (b) if an input failed and we propagated that immediately because of allMustSucceed. - checkState( - allMustSucceed || !isDone() || isCancelled(), - "Future was done before all dependencies completed"); + private static void log(Throwable throwable) { + String message = + (throwable instanceof Error) + ? "Input Future failed with Error" + : "Got more than one input Future failure. Logging failures after the first"; + logger.log(SEVERE, message, throwable); + } - try { - checkState(future.isDone(), "Tried to set value from future which is not done"); - if (allMustSucceed) { - if (future.isCancelled()) { - // clear running state prior to cancelling children, this sets our own state but lets - // the input futures keep running as some of them may be used elsewhere. - runningState = null; - cancel(false); - } else { - // We always get the result so that we can have fail-fast, even if we don't collect - InputT result = getDone(future); - if (collectsValues) { - collectOneValue(allMustSucceed, index, result); - } - } - } else if (collectsValues && !future.isCancelled()) { - collectOneValue(allMustSucceed, index, getDone(future)); - } - } catch (ExecutionException e) { - handleException(e.getCause()); - } catch (Throwable t) { - handleException(t); - } + @Override + final void addInitialException(Set seen) { + checkNotNull(seen); + if (!isCancelled()) { + // TODO(cpovirk): Think about whether we could/should use Verify to check this. + boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); } + } - private void decrementCountAndMaybeComplete() { - int newRemaining = decrementRemainingAndGet(); - checkState(newRemaining >= 0, "Less than 0 remaining futures"); - if (newRemaining == 0) { - processCompleted(); - } + /** + * Collects the result (success or failure) of one input future. The input must not have been + * cancelled. For details on when this is called, see {@link #collectOneValue}. + */ + private void collectValueFromNonCancelledFuture(int index, Future future) { + try { + // We get the result, even if collectOneValue is a no-op, so that we can fail fast. + collectOneValue(index, getDone(future)); + } catch (ExecutionException e) { + handleException(e.getCause()); + } catch (Throwable t) { + handleException(t); } + } + + private void decrementCountAndMaybeComplete( + @Nullable + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + int newRemaining = decrementRemainingAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + processCompleted(futuresIfNeedToCollectAtCompletion); + } + } - private void processCompleted() { - // Collect the values if (a) our output requires collecting them and (b) we haven't been - // collecting them as we go. (We've collected them as we go only if we needed to fail fast) - if (collectsValues & !allMustSucceed) { - int i = 0; - for (ListenableFuture listenable : futures) { - handleOneInputDone(i++, listenable); + private void processCompleted( + @Nullable + ImmutableCollection> + futuresIfNeedToCollectAtCompletion) { + if (futuresIfNeedToCollectAtCompletion != null) { + int i = 0; + for (Future future : futuresIfNeedToCollectAtCompletion) { + if (!future.isCancelled()) { + collectValueFromNonCancelledFuture(i, future); } + i++; } - handleAllCompleted(); } - - /** - * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we - * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we - * immediately release resources we no longer need); additionally, the future will release its - * reference to {@link RunningState}, which should free all associated memory when all the - * futures complete and the listeners are released. - * - *

TODO(user): Write tests for memory retention + clearSeenExceptions(); + handleAllCompleted(); + /* + * Null out fields, including some used in handleAllCompleted() above (like + * `CollectionFuture.values`). This might be a no-op: If this future completed during + * handleAllCompleted(), they will already have been nulled out. But in the case of + * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in + * the case of callAsync(), which waits for the callback's returned future to complete. */ - @ForOverride - @OverridingMethodsMustInvokeSuper - void releaseResourcesAfterFailure() { - this.futures = null; - } + releaseResources(ALL_INPUT_FUTURES_PROCESSED); + } - /** - * Called only if {@code collectsValues} is true. - * - *

If {@code allMustSucceed} is true, called as each future completes; otherwise, called for - * each future when all futures complete. + /** + * Clears fields that are no longer needed after this future has completed -- or at least all its + * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). + * Often called multiple times (that is, both when the inputs complete and when the output + * completes). + * + *

This is similar to our proposed {@code afterCommit} method but not quite the same. See the + * description of CL 265462958. + */ + // TODO(user): Write more tests for memory retention. + @ForOverride + @OverridingMethodsMustInvokeSuper + void releaseResources(ReleaseResourcesReason reason) { + checkNotNull(reason); + /* + * All elements of `futures` are completed, or this future has already completed and read + * `futures` into a local variable (in preparation for propagating cancellation to them). In + * either case, no one needs to read `futures` for cancellation purposes later. (And + * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) */ - abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue); - - abstract void handleAllCompleted(); + this.futures = null; + } - void interruptTask() {} + enum ReleaseResourcesReason { + OUTPUT_FUTURE_DONE, + ALL_INPUT_FUTURES_PROCESSED, } + /** + * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code + * collectsValues} is true, called for each future when all futures complete. + */ + abstract void collectOneValue(int index, @Nullable InputT returnValue); + + abstract void handleAllCompleted(); + /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ private static boolean addCausalChain(Set seen, Throwable t) { for (; t != null; t = t.getCause()) { diff --git a/guava/src/com/google/common/util/concurrent/AggregateFutureState.java b/guava/src/com/google/common/util/concurrent/AggregateFutureState.java index 040d81363c47..f8398d817eb2 100644 --- a/guava/src/com/google/common/util/concurrent/AggregateFutureState.java +++ b/guava/src/com/google/common/util/concurrent/AggregateFutureState.java @@ -37,9 +37,9 @@ */ @GwtCompatible(emulated = true) @ReflectionSupport(value = ReflectionSupport.Level.FULL) -abstract class AggregateFutureState { +abstract class AggregateFutureState extends AbstractFuture.TrustedFuture { // Lazily initialized the first time we see an exception; not released until all the input futures - // & this future completes. Released when the future releases the reference to the running state + // have completed and we have processed them all. private volatile Set seenExceptions = null; private volatile int remaining; @@ -89,12 +89,27 @@ final Set getOrInitSeenExceptions() { * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception, * and wrongly believes that its exception is new (leading it to logging it when it shouldn't) * - * Our solution is for threads to CAS seenExceptions from null to a Set population with _the + * Our solution is for threads to CAS seenExceptions from null to a Set populated with _the * initial exception_, no matter which thread does the work. This ensures that seenExceptions * always contains not just the current thread's exception but also the initial thread's. */ Set seenExceptionsLocal = seenExceptions; if (seenExceptionsLocal == null) { + // TODO(cpovirk): Should we use a simpler (presumably cheaper) data structure? + /* + * Using weak references here could let us release exceptions earlier, but: + * + * 1. On Android, querying a WeakReference blocks if the GC is doing an otherwise-concurrent + * pass. + * + * 2. We would probably choose to compare exceptions using == instead of equals() (for + * consistency with how weak references are cleared). That's a behavior change -- arguably the + * removal of a feature. + * + * Fortunately, exceptions rarely contain references to expensive resources. + */ + + // seenExceptionsLocal = newConcurrentHashSet(); /* * Other handleException() callers may see this as soon as we publish it. We need to populate @@ -122,6 +137,10 @@ final int decrementRemainingAndGet() { return ATOMIC_HELPER.decrementAndGetRemainingCount(this); } + final void clearSeenExceptions() { + seenExceptions = null; + } + private abstract static class AtomicHelper { /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */ abstract void compareAndSetSeenExceptions( @@ -169,8 +188,7 @@ void compareAndSetSeenExceptions( @Override int decrementAndGetRemainingCount(AggregateFutureState state) { synchronized (state) { - state.remaining--; - return state.remaining; + return --state.remaining; } } } diff --git a/guava/src/com/google/common/util/concurrent/CollectionFuture.java b/guava/src/com/google/common/util/concurrent/CollectionFuture.java index b1412d8dc0c7..e2ddf5ddb8e0 100644 --- a/guava/src/com/google/common/util/concurrent/CollectionFuture.java +++ b/guava/src/com/google/common/util/concurrent/CollectionFuture.java @@ -14,7 +14,6 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayListWithCapacity; import static java.util.Collections.unmodifiableList; @@ -29,83 +28,64 @@ /** Aggregate future that collects (stores) results of each future. */ @GwtCompatible(emulated = true) abstract class CollectionFuture extends AggregateFuture { + private List> values; - abstract class CollectionFutureRunningState extends RunningState { - private List> values; + CollectionFuture( + ImmutableCollection> futures, + boolean allMustSucceed) { + super(futures, allMustSucceed, true); - CollectionFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed, true); - - this.values = - futures.isEmpty() - ? ImmutableList.>of() - : Lists.>newArrayListWithCapacity(futures.size()); + this.values = + futures.isEmpty() + ? ImmutableList.>of() + : Lists.>newArrayListWithCapacity(futures.size()); - // Populate the results list with null initially. - for (int i = 0; i < futures.size(); ++i) { - values.add(null); - } + // Populate the results list with null initially. + for (int i = 0; i < futures.size(); ++i) { + values.add(null); } + } - @Override - final void collectOneValue(boolean allMustSucceed, int index, @Nullable V returnValue) { - List> localValues = values; - - if (localValues != null) { - localValues.set(index, Optional.fromNullable(returnValue)); - } else { - // Some other future failed or has been cancelled, causing this one to also be cancelled or - // have an exception set. This should only happen if allMustSucceed is true or if the output - // itself has been cancelled. - checkState( - allMustSucceed || isCancelled(), "Future was done before all dependencies completed"); - } - } - - @Override - final void handleAllCompleted() { - List> localValues = values; - if (localValues != null) { - set(combine(localValues)); - } else { - checkState(isDone()); - } + @Override + final void collectOneValue(int index, @Nullable V returnValue) { + List> localValues = values; + if (localValues != null) { + localValues.set(index, Optional.fromNullable(returnValue)); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); - this.values = null; + @Override + final void handleAllCompleted() { + List> localValues = values; + if (localValues != null) { + set(combine(localValues)); } + } - abstract C combine(List> values); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + this.values = null; } + abstract C combine(List> values); + /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */ static final class ListFuture extends CollectionFuture> { ListFuture( ImmutableCollection> futures, boolean allMustSucceed) { - init(new ListFutureRunningState(futures, allMustSucceed)); + super(futures, allMustSucceed); + init(); } - private final class ListFutureRunningState extends CollectionFutureRunningState { - ListFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed) { - super(futures, allMustSucceed); - } - - @Override - public List combine(List> values) { - List result = newArrayListWithCapacity(values.size()); - for (Optional element : values) { - result.add(element != null ? element.orNull() : null); - } - return unmodifiableList(result); + @Override + public List combine(List> values) { + List result = newArrayListWithCapacity(values.size()); + for (Optional element : values) { + result.add(element != null ? element.orNull() : null); } + return unmodifiableList(result); } } } diff --git a/guava/src/com/google/common/util/concurrent/CombinedFuture.java b/guava/src/com/google/common/util/concurrent/CombinedFuture.java index c50fc0bc7753..d9e433437f10 100644 --- a/guava/src/com/google/common/util/concurrent/CombinedFuture.java +++ b/guava/src/com/google/common/util/concurrent/CombinedFuture.java @@ -15,7 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -30,16 +30,16 @@ /** Aggregate future that computes its value by calling a callable. */ @GwtCompatible final class CombinedFuture extends AggregateFuture { + private CombinedFutureInterruptibleTask task; + CombinedFuture( ImmutableCollection> futures, boolean allMustSucceed, Executor listenerExecutor, AsyncCallable callable) { - init( - new CombinedFutureRunningState( - futures, - allMustSucceed, - new AsyncCallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor); + init(); } CombinedFuture( @@ -47,47 +47,42 @@ final class CombinedFuture extends AggregateFuture { boolean allMustSucceed, Executor listenerExecutor, Callable callable) { - init( - new CombinedFutureRunningState( - futures, allMustSucceed, new CallableInterruptibleTask(callable, listenerExecutor))); + super(futures, allMustSucceed, false); + this.task = new CallableInterruptibleTask(callable, listenerExecutor); + init(); } - private final class CombinedFutureRunningState extends RunningState { - private CombinedFutureInterruptibleTask task; - - CombinedFutureRunningState( - ImmutableCollection> futures, - boolean allMustSucceed, - CombinedFutureInterruptibleTask task) { - super(futures, allMustSucceed, false); - this.task = task; - } + @Override + void collectOneValue(int index, @Nullable Object returnValue) {} - @Override - void collectOneValue(boolean allMustSucceed, int index, @Nullable Object returnValue) {} - - @Override - void handleAllCompleted() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.execute(); - } else { - checkState(isDone()); - } + @Override + void handleAllCompleted() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.execute(); } + } - @Override - void releaseResourcesAfterFailure() { - super.releaseResourcesAfterFailure(); + @Override + void releaseResources(ReleaseResourcesReason reason) { + super.releaseResources(reason); + /* + * If the output future is done, then it won't need to interrupt the task later, so it can clear + * its reference to it. + * + * If the output future is *not* done, then the task field will be cleared after the task runs + * or after the output future is done, whichever comes first. + */ + if (reason == OUTPUT_FUTURE_DONE) { this.task = null; } + } - @Override - void interruptTask() { - CombinedFutureInterruptibleTask localTask = task; - if (localTask != null) { - localTask.interruptTask(); - } + @Override + protected void interruptTask() { + CombinedFutureInterruptibleTask localTask = task; + if (localTask != null) { + localTask.interruptTask(); } } @@ -96,7 +91,7 @@ private abstract class CombinedFutureInterruptibleTask extends InterruptibleT private final Executor listenerExecutor; boolean thrownByExecute = true; - public CombinedFutureInterruptibleTask(Executor listenerExecutor) { + CombinedFutureInterruptibleTask(Executor listenerExecutor) { this.listenerExecutor = checkNotNull(listenerExecutor); } @@ -117,6 +112,19 @@ final void execute() { @Override final void afterRanInterruptibly(T result, Throwable error) { + /* + * The future no longer needs to interrupt this task, so it no longer needs a reference to it. + * + * TODO(cpovirk): It might be nice for our InterruptibleTask subclasses to null out their + * `callable` fields automatically. That would make it less important for us to null out the + * reference to `task` here (though it's still nice to do so in case our reference to the + * executor keeps it alive). Ideally, nulling out `callable` would be the responsibility of + * InterruptibleTask itself so that its other subclasses also benefit. (Handling `callable` in + * InterruptibleTask itself might also eliminate some of the existing boilerplate for, e.g., + * pendingToString().) + */ + CombinedFuture.this.task = null; + if (error != null) { if (error instanceof ExecutionException) { setException(error.getCause()); @@ -138,7 +146,7 @@ private final class AsyncCallableInterruptibleTask extends CombinedFutureInterruptibleTask> { private final AsyncCallable callable; - public AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { + AsyncCallableInterruptibleTask(AsyncCallable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); } @@ -157,9 +165,6 @@ ListenableFuture runInterruptibly() throws Exception { @Override void setValue(ListenableFuture value) { setFuture(value); - // Eagerly release resources instead of waiting for afterDone. We are done with the inputs, - // but the actual future may not complete for arbitrarily long. - releaseResources(); } @Override @@ -172,7 +177,7 @@ String toPendingString() { private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask { private final Callable callable; - public CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { + CallableInterruptibleTask(Callable callable, Executor listenerExecutor) { super(listenerExecutor); this.callable = checkNotNull(callable); }