diff --git a/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java b/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java index 98c89259a581..8d369f5b66b5 100644 --- a/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java +++ b/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java @@ -15,11 +15,16 @@ package com.google.common.util.concurrent; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.Futures.allAsList; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.concurrent.TimeUnit.SECONDS; +import com.google.common.annotations.GwtIncompatible; +import com.google.common.testing.GcFinalization; import com.google.common.testing.TestLogHandler; +import com.google.j2objc.annotations.J2ObjCIncompatible; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -146,6 +151,36 @@ public Boolean call() { assertThat(getDone(future2)).isFalse(); } + @GwtIncompatible + @J2ObjCIncompatible // gc + @AndroidIncompatible + public void testCancellationWithReferencedObject() throws Exception { + Object toBeGCed = new Object(); + WeakReference ref = new WeakReference<>(toBeGCed); + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture ignored = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); + toBeGCed = null; + GcFinalization.awaitClear(ref); + } + + private static Callable toStringCallable(final Object object) { + return new Callable() { + @Override + public String call() { + return object.toString(); + } + }; + } + public void testCancellationDuringReentrancy() throws Exception { TestLogHandler logHandler = new TestLogHandler(); Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); @@ -191,6 +226,171 @@ public Void call() { assertThat(logHandler.getStoredLogRecords()).isEmpty(); } + public void testAvoidsStackOverflow_manySubmitted() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ArrayList> results = new ArrayList<>(50_001); + results.add( + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor())); + for (int i = 0; i < 50_000; i++) { + results.add(serializer.submit(Callables.returning(null), directExecutor())); + } + settableFuture.set(null); + getDone(allAsList(results)); + } + + public void testAvoidsStackOverflow_manyCancelled() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 50_000; i++) { + serializer.submit(Callables.returning(null), directExecutor()).cancel(true); + } + ListenableFuture stackDepthCheck = + serializer.submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor()); + settableFuture.set(null); + assertThat(getDone(stackDepthCheck)) + .isLessThan(Thread.currentThread().getStackTrace().length + 100); + } + + public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 25_000; i++) { + serializer.submit(Callables.returning(null), directExecutor()).cancel(true); + unused = serializer.submit(Callables.returning(null), directExecutor()); + } + ListenableFuture stackDepthCheck = + serializer.submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor()); + settableFuture.set(null); + assertThat(getDone(stackDepthCheck)) + .isLessThan(Thread.currentThread().getStackTrace().length + 100); + } + + private static final class LongHolder { + long count; + } + + private static final int ITERATION_COUNT = 50_000; + private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; + + @GwtIncompatible // threads + + public void testAvoidsStackOverflow_multipleThreads() throws Exception { + final LongHolder holder = new LongHolder(); + final ArrayList> lengthChecks = new ArrayList<>(); + final List completeLengthChecks; + final int baseStackDepth; + ExecutorService service = Executors.newFixedThreadPool(5); + try { + // Avoid counting frames from the executor itself, or the ExecutionSequencer + baseStackDepth = + serializer + .submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + service) + .get(); + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 50_000; i++) { + if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { + // after some number of iterations, switch threads + unused = + serializer.submit( + new Callable() { + @Override + public Void call() { + holder.count++; + return null; + } + }, + service); + } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { + // When at max depth, record stack trace depth + lengthChecks.add( + serializer.submit( + new Callable() { + @Override + public Integer call() { + holder.count++; + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor())); + } else { + // Otherwise, schedule a task on directExecutor + unused = + serializer.submit( + new Callable() { + @Override + public Void call() { + holder.count++; + return null; + } + }, + directExecutor()); + } + } + settableFuture.set(null); + completeLengthChecks = allAsList(lengthChecks).get(); + } finally { + service.shutdown(); + } + assertThat(holder.count).isEqualTo(ITERATION_COUNT); + for (int length : completeLengthChecks) { + // Verify that at max depth, less than one stack frame per submitted task was consumed + assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); + } + } + public void testToString() { Future first = serializer.submitAsync(firstCallable, directExecutor()); TestCallable secondCallable = new TestCallable(SettableFuture.create()); diff --git a/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java b/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java index 88929b6cedbb..872178066d00 100644 --- a/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java +++ b/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java @@ -15,6 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; @@ -48,16 +49,48 @@ public static ExecutionSequencer create() { return new ExecutionSequencer(); } - enum RunningState { - NOT_RUN, - CANCELLED, - STARTED, - } - /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ private final AtomicReference> ref = new AtomicReference<>(immediateFuture(null)); + private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue(); + + /** + * This object is unsafely published, but avoids problematic races by relying exclusively on the + * identity equality of its Thread field so that the task field is only accessed by a single + * thread. + */ + private static final class ThreadConfinedTaskQueue { + /** + * This field is only used for identity comparisons with the current thread. Field assignments + * are atomic, but do not provide happens-before ordering; however: + * + *
    + *
  • If this field's value == currentThread, we know that it's up to date, because write + * operations in a thread always happen-before subsequent read operations in the same + * thread + *
  • If this field's value == null because of unsafe publication, we know that it isn't the + * object associated with our thread, because if it was the publication wouldn't have been + * unsafe and we'd have seen our thread as the value. This state is also why a new + * ThreadConfinedTaskQueue object must be created for each inline execution, because + * observing a null thread does not mean the object is safe to reuse. + *
  • If this field's value is some other thread object, we know that it's not our thread. + *
  • If this field's value == null because it originally belonged to another thread and that + * thread cleared it, we still know that it's not associated with our thread + *
  • If this field's value == null because it was associated with our thread and was + * cleared, we know that we're not executing inline any more + *
+ * + * All the states where thread != currentThread are identical for our purposes, and so even + * though it's racy, we don't care which of those values we get, so no need to synchronize. + */ + Thread thread; + /** Only used by the thread associated with this object */ + Runnable nextTask; + /** Only used by the thread associated with this object */ + Executor nextExecutor; + } + /** * Enqueues a task to run when the previous task (if any) completes. * @@ -67,6 +100,7 @@ enum RunningState { */ public ListenableFuture submit(final Callable callable, Executor executor) { checkNotNull(callable); + checkNotNull(executor); return submitAsync( new AsyncCallable() { @Override @@ -92,12 +126,13 @@ public String toString() { public ListenableFuture submitAsync( final AsyncCallable callable, final Executor executor) { checkNotNull(callable); - final AtomicReference runningState = new AtomicReference<>(NOT_RUN); + checkNotNull(executor); + final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this); final AsyncCallable task = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { - if (!runningState.compareAndSet(NOT_RUN, STARTED)) { + if (!taskExecutor.trySetStarted()) { return immediateCancelledFuture(); } return callable.call(); @@ -124,15 +159,8 @@ public String toString() { final ListenableFuture oldFuture = ref.getAndSet(newFuture); // Invoke our task once the previous future completes. - final ListenableFuture taskFuture = - Futures.submitAsync( - task, - new Executor() { - @Override - public void execute(Runnable runnable) { - oldFuture.addListener(runnable, executor); - } - }); + final TrustedListenableFutureTask taskFuture = TrustedListenableFutureTask.create(task); + oldFuture.addListener(taskFuture, taskExecutor); final ListenableFuture outputFuture = Futures.nonCancellationPropagating(taskFuture); @@ -144,15 +172,17 @@ public void execute(Runnable runnable) { new Runnable() { @Override public void run() { - if (taskFuture.isDone() - // If this CAS succeeds, we know that the provided callable will never be invoked, - // so when oldFuture completes it is safe to allow the next submitted task to - // proceed. - || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { + if (taskFuture.isDone()) { // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of // a future that eventually came from immediateFuture(null), this doesn't leak // throwables or completion values. newFuture.setFuture(oldFuture); + } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) { + // If this CAS succeeds, we know that the provided callable will never be invoked, + // so when oldFuture completes it is safe to allow the next submitted task to + // proceed. Doing this immediately here lets the next task run without waiting for + // the cancelled task's executor to run the noop AsyncCallable. + taskFuture.cancel(false); } } }; @@ -164,4 +194,163 @@ public void run() { return outputFuture; } + + enum RunningState { + NOT_RUN, + CANCELLED, + STARTED, + } + + /** + * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with + * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other + * tasks would be called recursively. Here, we detect that the delegate executor is executing + * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class + * per call to submit() or submitAsync(), and each instance supports only one call to execute(). + * + *

This class would certainly be simpler and easier to reason about if it were built with + * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is + * non-static, and is initialized/removed frequently - this causes churn in the Thread specific + * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different + * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition + * to increasing the memory footprint of every thread that interacted with it. In order to release + * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced, + * ThreadLocal is usually implemented with a WeakReference, which can have negative performance + * properties; for example, calling WeakReference.get() on Android will block during an + * otherwise-concurrent GC cycle. + */ + @SuppressWarnings("ShouldNotSubclass") // Saving an allocation here is worth it + private static final class TaskNonReentrantExecutor extends AtomicReference + implements Executor, Runnable { + + /** + * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run + * or queued. + */ + ExecutionSequencer sequencer; + + /** + * Executor the task was set to run on. Set to null when the task has been queued, run, or + * cancelled. + */ + Executor delegate; + + /** + * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this + * object may live on after, if submitAsync returns an incomplete future. + */ + Runnable task; + + /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */ + Thread submitting; + + private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) { + super(NOT_RUN); + this.delegate = delegate; + this.sequencer = sequencer; + } + + @Override + public void execute(Runnable task) { + // If this operation was successfully cancelled already, calling the runnable will be a noop. + // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel, + // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run + // without waiting for the user's executor to run our submitted Runnable. However, this can + // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation + // before the cancelled future completes, it will synchronously complete both the newFuture + // from the cancelled operation and its own. This can cause one runnable to queue two tasks, + // breaking the invariant this method relies on to iteratively run the next task after the + // previous one completes. + if (get() == RunningState.CANCELLED) { + delegate = null; + sequencer = null; + return; + } + submitting = Thread.currentThread(); + try { + ThreadConfinedTaskQueue submittingTaskQueue = sequencer.latestTaskQueue; + if (submittingTaskQueue.thread == submitting) { + sequencer = null; + // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and + // can't know without submitting something to the executor) so queue to run iteratively. + // Task must be null, since each execution on this executor can only produce one more + // execution. + checkState(submittingTaskQueue.nextTask == null); + submittingTaskQueue.nextTask = task; + submittingTaskQueue.nextExecutor = delegate; + delegate = null; + } else { + Executor localDelegate = delegate; + delegate = null; + this.task = task; + localDelegate.execute(this); + } + } finally { + // Important to null this out here - if we did *not* execute inline, we might still + // run() on the same thread that called execute() - such as in a thread pool, and think + // that it was happening inline. As a side benefit, avoids holding on to the Thread object + // longer than necessary. + submitting = null; + } + } + + @SuppressWarnings("ShortCircuitBoolean") + @Override + public void run() { + Thread currentThread = Thread.currentThread(); + if (currentThread != submitting) { + Runnable localTask = task; + task = null; + localTask.run(); + return; + } + // Executor called reentrantly! Make sure that further calls don't overflow stack. Further + // reentrant calls will see that their current thread is the same as the one set in + // latestTaskQueue, and queue rather than calling execute() directly. + ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue(); + executingTaskQueue.thread = currentThread; + // Unconditionally set; there is no risk of throwing away a queued task from another thread, + // because in order for the current task to run on this executor the previous task must have + // already started execution. Because each task on a TaskNonReentrantExecutor can only produce + // one execute() call to another instance from the same ExecutionSequencer, we know by + // induction that the task that launched this one must not have added any other runnables to + // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would + // otherwise have another task queued on to it. Note the exception to this, cancellation, is + // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and + // thus don't count. + sequencer.latestTaskQueue = executingTaskQueue; + sequencer = null; + try { + Runnable localTask = task; + task = null; + localTask.run(); + // Now check if our task attempted to reentrantly execute the next task. + Runnable queuedTask; + Executor queuedExecutor; + // Intentionally using non-short-circuit operator + while ((queuedTask = executingTaskQueue.nextTask) != null + & (queuedExecutor = executingTaskQueue.nextExecutor) != null) { + executingTaskQueue.nextTask = null; + executingTaskQueue.nextExecutor = null; + queuedExecutor.execute(queuedTask); + } + } finally { + // Null out the thread field, so that we don't leak a reference to Thread, and so that + // future `thread == currentThread()` calls from this thread don't incorrectly queue instead + // of executing. Don't null out the latestTaskQueue field, because the work done here + // may have scheduled more operations on another thread, and if those operations then + // trigger reentrant calls that thread will have updated the latestTaskQueue field, and + // we'd be interfering with their operation. + executingTaskQueue.thread = null; + } + } + + private boolean trySetStarted() { + return compareAndSet(NOT_RUN, STARTED); + } + + private boolean trySetCancelled() { + return compareAndSet(NOT_RUN, CANCELLED); + } + } } diff --git a/cycle_whitelist.txt b/cycle_whitelist.txt index e9c70c3ef79a..56d6251b6a91 100644 --- a/cycle_whitelist.txt +++ b/cycle_whitelist.txt @@ -15,6 +15,8 @@ NAMESPACE org.junit # ***** REAL CYCLES ***** # Inverses (currently not solvable by weakening a reference) FIELD com.google.common.base.Converter.reverse +# Cycle exists until future completes +FIELD com.google.common.util.concurrent.AbstractFuture.Listener.executor com.google.common.util.concurrent.ExecutionSequencer.TaskNonReentrantExecutor # ***** FALSE POSITIVES ***** @@ -38,6 +40,8 @@ FIELD com.google.common.collect.TreeTraverser.PostOrderNode.childIterator FIELD com.google.common.collect.TreeTraverser.PreOrderIterator.stack FIELD com.google.common.util.concurrent.AbstractFuture.Listener.executor com.google.common.util.concurrent.MoreExecutors.rejectionPropagatingExecutor.$ FIELD com.google.common.util.concurrent.AbstractService.listeners +# NonReentrantExecutor is not exposed to this field +FIELD com.google.common.util.concurrent.ExecutionSequencer.ThreadConfinedTaskQueue.nextExecutor com.google.common.util.concurrent.ExecutionSequencer.TaskNonReentrantExecutor # Real cycle, but the runningState field is null'ed on completion of the future. FIELD com.google.common.util.concurrent.AggregateFuture.runningState FIELD java.util.AbstractMap.keySet com.google.common.collect.AbstractMapBasedMultimap.NavigableKeySet diff --git a/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java b/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java index 98c89259a581..8d369f5b66b5 100644 --- a/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java @@ -15,11 +15,16 @@ package com.google.common.util.concurrent; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.Futures.allAsList; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.concurrent.TimeUnit.SECONDS; +import com.google.common.annotations.GwtIncompatible; +import com.google.common.testing.GcFinalization; import com.google.common.testing.TestLogHandler; +import com.google.j2objc.annotations.J2ObjCIncompatible; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -146,6 +151,36 @@ public Boolean call() { assertThat(getDone(future2)).isFalse(); } + @GwtIncompatible + @J2ObjCIncompatible // gc + @AndroidIncompatible + public void testCancellationWithReferencedObject() throws Exception { + Object toBeGCed = new Object(); + WeakReference ref = new WeakReference<>(toBeGCed); + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture ignored = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); + toBeGCed = null; + GcFinalization.awaitClear(ref); + } + + private static Callable toStringCallable(final Object object) { + return new Callable() { + @Override + public String call() { + return object.toString(); + } + }; + } + public void testCancellationDuringReentrancy() throws Exception { TestLogHandler logHandler = new TestLogHandler(); Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); @@ -191,6 +226,171 @@ public Void call() { assertThat(logHandler.getStoredLogRecords()).isEmpty(); } + public void testAvoidsStackOverflow_manySubmitted() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ArrayList> results = new ArrayList<>(50_001); + results.add( + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor())); + for (int i = 0; i < 50_000; i++) { + results.add(serializer.submit(Callables.returning(null), directExecutor())); + } + settableFuture.set(null); + getDone(allAsList(results)); + } + + public void testAvoidsStackOverflow_manyCancelled() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 50_000; i++) { + serializer.submit(Callables.returning(null), directExecutor()).cancel(true); + } + ListenableFuture stackDepthCheck = + serializer.submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor()); + settableFuture.set(null); + assertThat(getDone(stackDepthCheck)) + .isLessThan(Thread.currentThread().getStackTrace().length + 100); + } + + public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 25_000; i++) { + serializer.submit(Callables.returning(null), directExecutor()).cancel(true); + unused = serializer.submit(Callables.returning(null), directExecutor()); + } + ListenableFuture stackDepthCheck = + serializer.submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor()); + settableFuture.set(null); + assertThat(getDone(stackDepthCheck)) + .isLessThan(Thread.currentThread().getStackTrace().length + 100); + } + + private static final class LongHolder { + long count; + } + + private static final int ITERATION_COUNT = 50_000; + private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; + + @GwtIncompatible // threads + + public void testAvoidsStackOverflow_multipleThreads() throws Exception { + final LongHolder holder = new LongHolder(); + final ArrayList> lengthChecks = new ArrayList<>(); + final List completeLengthChecks; + final int baseStackDepth; + ExecutorService service = Executors.newFixedThreadPool(5); + try { + // Avoid counting frames from the executor itself, or the ExecutionSequencer + baseStackDepth = + serializer + .submit( + new Callable() { + @Override + public Integer call() { + return Thread.currentThread().getStackTrace().length; + } + }, + service) + .get(); + final SettableFuture settableFuture = SettableFuture.create(); + ListenableFuture unused = + serializer.submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() { + return settableFuture; + } + }, + directExecutor()); + for (int i = 0; i < 50_000; i++) { + if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { + // after some number of iterations, switch threads + unused = + serializer.submit( + new Callable() { + @Override + public Void call() { + holder.count++; + return null; + } + }, + service); + } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { + // When at max depth, record stack trace depth + lengthChecks.add( + serializer.submit( + new Callable() { + @Override + public Integer call() { + holder.count++; + return Thread.currentThread().getStackTrace().length; + } + }, + directExecutor())); + } else { + // Otherwise, schedule a task on directExecutor + unused = + serializer.submit( + new Callable() { + @Override + public Void call() { + holder.count++; + return null; + } + }, + directExecutor()); + } + } + settableFuture.set(null); + completeLengthChecks = allAsList(lengthChecks).get(); + } finally { + service.shutdown(); + } + assertThat(holder.count).isEqualTo(ITERATION_COUNT); + for (int length : completeLengthChecks) { + // Verify that at max depth, less than one stack frame per submitted task was consumed + assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); + } + } + public void testToString() { Future first = serializer.submitAsync(firstCallable, directExecutor()); TestCallable secondCallable = new TestCallable(SettableFuture.create()); diff --git a/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java b/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java index 88929b6cedbb..872178066d00 100644 --- a/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java +++ b/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java @@ -15,6 +15,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; @@ -48,16 +49,48 @@ public static ExecutionSequencer create() { return new ExecutionSequencer(); } - enum RunningState { - NOT_RUN, - CANCELLED, - STARTED, - } - /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ private final AtomicReference> ref = new AtomicReference<>(immediateFuture(null)); + private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue(); + + /** + * This object is unsafely published, but avoids problematic races by relying exclusively on the + * identity equality of its Thread field so that the task field is only accessed by a single + * thread. + */ + private static final class ThreadConfinedTaskQueue { + /** + * This field is only used for identity comparisons with the current thread. Field assignments + * are atomic, but do not provide happens-before ordering; however: + * + *
    + *
  • If this field's value == currentThread, we know that it's up to date, because write + * operations in a thread always happen-before subsequent read operations in the same + * thread + *
  • If this field's value == null because of unsafe publication, we know that it isn't the + * object associated with our thread, because if it was the publication wouldn't have been + * unsafe and we'd have seen our thread as the value. This state is also why a new + * ThreadConfinedTaskQueue object must be created for each inline execution, because + * observing a null thread does not mean the object is safe to reuse. + *
  • If this field's value is some other thread object, we know that it's not our thread. + *
  • If this field's value == null because it originally belonged to another thread and that + * thread cleared it, we still know that it's not associated with our thread + *
  • If this field's value == null because it was associated with our thread and was + * cleared, we know that we're not executing inline any more + *
+ * + * All the states where thread != currentThread are identical for our purposes, and so even + * though it's racy, we don't care which of those values we get, so no need to synchronize. + */ + Thread thread; + /** Only used by the thread associated with this object */ + Runnable nextTask; + /** Only used by the thread associated with this object */ + Executor nextExecutor; + } + /** * Enqueues a task to run when the previous task (if any) completes. * @@ -67,6 +100,7 @@ enum RunningState { */ public ListenableFuture submit(final Callable callable, Executor executor) { checkNotNull(callable); + checkNotNull(executor); return submitAsync( new AsyncCallable() { @Override @@ -92,12 +126,13 @@ public String toString() { public ListenableFuture submitAsync( final AsyncCallable callable, final Executor executor) { checkNotNull(callable); - final AtomicReference runningState = new AtomicReference<>(NOT_RUN); + checkNotNull(executor); + final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this); final AsyncCallable task = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { - if (!runningState.compareAndSet(NOT_RUN, STARTED)) { + if (!taskExecutor.trySetStarted()) { return immediateCancelledFuture(); } return callable.call(); @@ -124,15 +159,8 @@ public String toString() { final ListenableFuture oldFuture = ref.getAndSet(newFuture); // Invoke our task once the previous future completes. - final ListenableFuture taskFuture = - Futures.submitAsync( - task, - new Executor() { - @Override - public void execute(Runnable runnable) { - oldFuture.addListener(runnable, executor); - } - }); + final TrustedListenableFutureTask taskFuture = TrustedListenableFutureTask.create(task); + oldFuture.addListener(taskFuture, taskExecutor); final ListenableFuture outputFuture = Futures.nonCancellationPropagating(taskFuture); @@ -144,15 +172,17 @@ public void execute(Runnable runnable) { new Runnable() { @Override public void run() { - if (taskFuture.isDone() - // If this CAS succeeds, we know that the provided callable will never be invoked, - // so when oldFuture completes it is safe to allow the next submitted task to - // proceed. - || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { + if (taskFuture.isDone()) { // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of // a future that eventually came from immediateFuture(null), this doesn't leak // throwables or completion values. newFuture.setFuture(oldFuture); + } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) { + // If this CAS succeeds, we know that the provided callable will never be invoked, + // so when oldFuture completes it is safe to allow the next submitted task to + // proceed. Doing this immediately here lets the next task run without waiting for + // the cancelled task's executor to run the noop AsyncCallable. + taskFuture.cancel(false); } } }; @@ -164,4 +194,163 @@ public void run() { return outputFuture; } + + enum RunningState { + NOT_RUN, + CANCELLED, + STARTED, + } + + /** + * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with + * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other + * tasks would be called recursively. Here, we detect that the delegate executor is executing + * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class + * per call to submit() or submitAsync(), and each instance supports only one call to execute(). + * + *

This class would certainly be simpler and easier to reason about if it were built with + * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is + * non-static, and is initialized/removed frequently - this causes churn in the Thread specific + * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different + * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition + * to increasing the memory footprint of every thread that interacted with it. In order to release + * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced, + * ThreadLocal is usually implemented with a WeakReference, which can have negative performance + * properties; for example, calling WeakReference.get() on Android will block during an + * otherwise-concurrent GC cycle. + */ + @SuppressWarnings("ShouldNotSubclass") // Saving an allocation here is worth it + private static final class TaskNonReentrantExecutor extends AtomicReference + implements Executor, Runnable { + + /** + * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run + * or queued. + */ + ExecutionSequencer sequencer; + + /** + * Executor the task was set to run on. Set to null when the task has been queued, run, or + * cancelled. + */ + Executor delegate; + + /** + * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this + * object may live on after, if submitAsync returns an incomplete future. + */ + Runnable task; + + /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */ + Thread submitting; + + private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) { + super(NOT_RUN); + this.delegate = delegate; + this.sequencer = sequencer; + } + + @Override + public void execute(Runnable task) { + // If this operation was successfully cancelled already, calling the runnable will be a noop. + // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel, + // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run + // without waiting for the user's executor to run our submitted Runnable. However, this can + // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation + // before the cancelled future completes, it will synchronously complete both the newFuture + // from the cancelled operation and its own. This can cause one runnable to queue two tasks, + // breaking the invariant this method relies on to iteratively run the next task after the + // previous one completes. + if (get() == RunningState.CANCELLED) { + delegate = null; + sequencer = null; + return; + } + submitting = Thread.currentThread(); + try { + ThreadConfinedTaskQueue submittingTaskQueue = sequencer.latestTaskQueue; + if (submittingTaskQueue.thread == submitting) { + sequencer = null; + // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and + // can't know without submitting something to the executor) so queue to run iteratively. + // Task must be null, since each execution on this executor can only produce one more + // execution. + checkState(submittingTaskQueue.nextTask == null); + submittingTaskQueue.nextTask = task; + submittingTaskQueue.nextExecutor = delegate; + delegate = null; + } else { + Executor localDelegate = delegate; + delegate = null; + this.task = task; + localDelegate.execute(this); + } + } finally { + // Important to null this out here - if we did *not* execute inline, we might still + // run() on the same thread that called execute() - such as in a thread pool, and think + // that it was happening inline. As a side benefit, avoids holding on to the Thread object + // longer than necessary. + submitting = null; + } + } + + @SuppressWarnings("ShortCircuitBoolean") + @Override + public void run() { + Thread currentThread = Thread.currentThread(); + if (currentThread != submitting) { + Runnable localTask = task; + task = null; + localTask.run(); + return; + } + // Executor called reentrantly! Make sure that further calls don't overflow stack. Further + // reentrant calls will see that their current thread is the same as the one set in + // latestTaskQueue, and queue rather than calling execute() directly. + ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue(); + executingTaskQueue.thread = currentThread; + // Unconditionally set; there is no risk of throwing away a queued task from another thread, + // because in order for the current task to run on this executor the previous task must have + // already started execution. Because each task on a TaskNonReentrantExecutor can only produce + // one execute() call to another instance from the same ExecutionSequencer, we know by + // induction that the task that launched this one must not have added any other runnables to + // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would + // otherwise have another task queued on to it. Note the exception to this, cancellation, is + // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and + // thus don't count. + sequencer.latestTaskQueue = executingTaskQueue; + sequencer = null; + try { + Runnable localTask = task; + task = null; + localTask.run(); + // Now check if our task attempted to reentrantly execute the next task. + Runnable queuedTask; + Executor queuedExecutor; + // Intentionally using non-short-circuit operator + while ((queuedTask = executingTaskQueue.nextTask) != null + & (queuedExecutor = executingTaskQueue.nextExecutor) != null) { + executingTaskQueue.nextTask = null; + executingTaskQueue.nextExecutor = null; + queuedExecutor.execute(queuedTask); + } + } finally { + // Null out the thread field, so that we don't leak a reference to Thread, and so that + // future `thread == currentThread()` calls from this thread don't incorrectly queue instead + // of executing. Don't null out the latestTaskQueue field, because the work done here + // may have scheduled more operations on another thread, and if those operations then + // trigger reentrant calls that thread will have updated the latestTaskQueue field, and + // we'd be interfering with their operation. + executingTaskQueue.thread = null; + } + } + + private boolean trySetStarted() { + return compareAndSet(NOT_RUN, STARTED); + } + + private boolean trySetCancelled() { + return compareAndSet(NOT_RUN, CANCELLED); + } + } }