Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move ExecutionSequencer into guava as @beta.
RELNOTES=Added `ExecutionSequencer`, a new utility to run a series of asynchronous operations serially. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=206609150
- Loading branch information
1 parent
f1249c4
commit 8d1fbc9
Showing
4 changed files
with
706 additions
and
0 deletions.
There are no files selected for viewing
196 changes: 196 additions & 0 deletions
196
android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
/* | ||
* Copyright (C) 2018 The Guava Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.common.util.concurrent; | ||
|
||
import static com.google.common.truth.Truth.assertThat; | ||
import static com.google.common.util.concurrent.Futures.getDone; | ||
import static com.google.common.util.concurrent.MoreExecutors.directExecutor; | ||
|
||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.JUnit4; | ||
|
||
/** Tests for {@link ExecutionSequencer} */ | ||
@RunWith(JUnit4.class) | ||
public class ExecutionSequencerTest { | ||
|
||
ExecutorService executor; | ||
|
||
private ExecutionSequencer serializer; | ||
private SettableFuture<Void> firstFuture; | ||
private TestCallable firstCallable; | ||
|
||
@Before | ||
public void setUp() throws Exception { | ||
executor = Executors.newCachedThreadPool(); | ||
serializer = ExecutionSequencer.create(); | ||
firstFuture = SettableFuture.create(); | ||
firstCallable = new TestCallable(firstFuture); | ||
} | ||
|
||
@After | ||
public void tearDown() throws Exception { | ||
executor.shutdown(); | ||
} | ||
|
||
@Test | ||
public void testCallableStartsAfterFirstFutureCompletes() { | ||
@SuppressWarnings({"unused", "nullness"}) | ||
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); | ||
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); | ||
@SuppressWarnings({"unused", "nullness"}) | ||
Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); | ||
assertThat(firstCallable.called).isTrue(); | ||
assertThat(secondCallable.called).isFalse(); | ||
firstFuture.set(null); | ||
assertThat(secondCallable.called).isTrue(); | ||
} | ||
|
||
@Test | ||
public void testCancellationNotPropagatedIfAlreadyStarted() { | ||
serializer.submitAsync(firstCallable, directExecutor()).cancel(true); | ||
assertThat(firstFuture.isCancelled()).isFalse(); | ||
} | ||
|
||
@Test | ||
public void testCancellationDoesNotViolateSerialization() { | ||
@SuppressWarnings({"unused", "nullness"}) | ||
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); | ||
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); | ||
ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor()); | ||
TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); | ||
@SuppressWarnings({"unused", "nullness"}) | ||
Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); | ||
secondFuture.cancel(true); | ||
assertThat(secondCallable.called).isFalse(); | ||
assertThat(thirdCallable.called).isFalse(); | ||
firstFuture.set(null); | ||
assertThat(secondCallable.called).isFalse(); | ||
assertThat(thirdCallable.called).isTrue(); | ||
} | ||
|
||
@Test | ||
public void testCancellationMultipleThreads() throws Exception { | ||
final BlockingCallable blockingCallable = new BlockingCallable(); | ||
ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor); | ||
ListenableFuture<Boolean> future2 = | ||
serializer.submit( | ||
new Callable<Boolean>() { | ||
@Override | ||
public Boolean call() { | ||
return blockingCallable.isRunning(); | ||
} | ||
}, | ||
directExecutor()); | ||
|
||
// Wait for the first task to be started in the background. It will block until we explicitly | ||
// stop it. | ||
blockingCallable.waitForStart(); | ||
|
||
// Give the second task a chance to (incorrectly) start up while the first task is running. | ||
assertThat(future2.isDone()).isFalse(); | ||
|
||
// Stop the first task. The second task should then run. | ||
blockingCallable.stop(); | ||
executor.shutdown(); | ||
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); | ||
assertThat(getDone(future2)).isFalse(); | ||
} | ||
|
||
@Test | ||
public void secondTaskWaitsForFirstEvenIfCancelled() throws Exception { | ||
final BlockingCallable blockingCallable = new BlockingCallable(); | ||
ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor); | ||
ListenableFuture<Boolean> future2 = | ||
serializer.submit( | ||
new Callable<Boolean>() { | ||
@Override | ||
public Boolean call() { | ||
return blockingCallable.isRunning(); | ||
} | ||
}, | ||
directExecutor()); | ||
|
||
// Wait for the first task to be started in the background. It will block until we explicitly | ||
// stop it. | ||
blockingCallable.waitForStart(); | ||
|
||
// This time, cancel the future for the first task. The task remains running, only the future | ||
// is cancelled. | ||
future1.cancel(false); | ||
|
||
// Give the second task a chance to (incorrectly) start up while the first task is running. | ||
// (This is the assertion that fails.) | ||
assertThat(future2.isDone()).isFalse(); | ||
|
||
// Stop the first task. The second task should then run. | ||
blockingCallable.stop(); | ||
executor.shutdown(); | ||
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); | ||
assertThat(getDone(future2)).isFalse(); | ||
} | ||
|
||
private static class BlockingCallable implements Callable<Void> { | ||
private final CountDownLatch startLatch = new CountDownLatch(1); | ||
private final CountDownLatch stopLatch = new CountDownLatch(1); | ||
|
||
private volatile boolean running = false; | ||
|
||
@Override | ||
public Void call() throws InterruptedException { | ||
running = true; | ||
startLatch.countDown(); | ||
stopLatch.await(); | ||
running = false; | ||
return null; | ||
} | ||
|
||
public void waitForStart() throws InterruptedException { | ||
startLatch.await(); | ||
} | ||
|
||
public void stop() { | ||
stopLatch.countDown(); | ||
} | ||
|
||
public boolean isRunning() { | ||
return running; | ||
} | ||
} | ||
|
||
private static final class TestCallable implements AsyncCallable<Void> { | ||
|
||
private final ListenableFuture<Void> future; | ||
private boolean called = false; | ||
|
||
private TestCallable(ListenableFuture<Void> future) { | ||
this.future = future; | ||
} | ||
|
||
@Override | ||
public ListenableFuture<Void> call() throws Exception { | ||
called = true; | ||
return future; | ||
} | ||
} | ||
} |
157 changes: 157 additions & 0 deletions
157
android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
* Copyright (C) 2018 The Guava Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.common.util.concurrent; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; | ||
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; | ||
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; | ||
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; | ||
import static com.google.common.util.concurrent.Futures.immediateFuture; | ||
import static com.google.common.util.concurrent.MoreExecutors.directExecutor; | ||
|
||
import com.google.common.annotations.Beta; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* Serializes execution of a set of operations. This class guarantees that a submitted callable will | ||
* not be called before previously submitted callables (and any {@code Future}s returned from them) | ||
* have completed. | ||
* | ||
* <p>This class implements a superset of the behavior of {@link | ||
* MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and | ||
* don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead. | ||
* | ||
* @since NEXT | ||
*/ | ||
@Beta | ||
public final class ExecutionSequencer { | ||
|
||
private ExecutionSequencer() {} | ||
|
||
/** Creates a new instance. */ | ||
public static ExecutionSequencer create() { | ||
return new ExecutionSequencer(); | ||
} | ||
|
||
enum RunningState { | ||
NOT_RUN, | ||
CANCELLED, | ||
STARTED, | ||
} | ||
|
||
/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ | ||
private final AtomicReference<ListenableFuture<Object>> ref = | ||
new AtomicReference<>(immediateFuture(null)); | ||
|
||
/** | ||
* Enqueues a task to run when the previous task (if any) completes. | ||
* | ||
* <p>Cancellation does not propagate from the output future to a callable that has begun to | ||
* execute, but if the output future is cancelled before {@link Callable#call()} is invoked, | ||
* {@link Callable#call()} will not be invoked. | ||
*/ | ||
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) { | ||
checkNotNull(callable); | ||
return submitAsync( | ||
new AsyncCallable<T>() { | ||
@Override | ||
public ListenableFuture<T> call() throws Exception { | ||
return immediateFuture(callable.call()); | ||
} | ||
}, | ||
executor); | ||
} | ||
|
||
/** | ||
* Enqueues a task to run when the previous task (if any) completes. | ||
* | ||
* <p>Cancellation does not propagate from the output future to the future returned from {@code | ||
* callable} or a callable that has begun to execute, but if the output future is cancelled before | ||
* {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. | ||
*/ | ||
public <T> ListenableFuture<T> submitAsync( | ||
final AsyncCallable<T> callable, final Executor executor) { | ||
checkNotNull(callable); | ||
final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN); | ||
final AsyncCallable<T> task = | ||
new AsyncCallable<T>() { | ||
@Override | ||
public ListenableFuture<T> call() throws Exception { | ||
if (!runningState.compareAndSet(NOT_RUN, STARTED)) { | ||
return immediateCancelledFuture(); | ||
} | ||
return callable.call(); | ||
} | ||
}; | ||
/* | ||
* Four futures are at play here: | ||
* taskFuture is the future tracking the result of the callable. | ||
* newFuture is a future that completes after this and all prior tasks are done. | ||
* oldFuture is the previous task's newFuture. | ||
* outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. | ||
* | ||
* newFuture is guaranteed to only complete once all tasks previously submitted to this instance | ||
* have completed - namely after oldFuture is done, and taskFuture has either completed or been | ||
* cancelled before the callable started execution. | ||
*/ | ||
final SettableFuture<Object> newFuture = SettableFuture.create(); | ||
|
||
final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture); | ||
|
||
// Invoke our task once the previous future completes. | ||
final ListenableFuture<T> taskFuture = | ||
Futures.submitAsync( | ||
task, | ||
new Executor() { | ||
@Override | ||
public void execute(Runnable runnable) { | ||
oldFuture.addListener(runnable, executor); | ||
} | ||
}); | ||
|
||
final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); | ||
|
||
// newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture | ||
// unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that | ||
// if the future we return is cancelled, we don't begin execution of the next task until after | ||
// oldFuture completes. | ||
Runnable listener = | ||
new Runnable() { | ||
@Override | ||
public void run() { | ||
if (taskFuture.isDone() | ||
// If this CAS succeeds, we know that the provided callable will never be invoked, | ||
// so when oldFuture completes it is safe to allow the next submitted task to | ||
// proceed. | ||
|| (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { | ||
// Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of | ||
// a future that eventually came from immediateFuture(null), this doesn't leak | ||
// throwables or completion values. | ||
newFuture.setFuture(oldFuture); | ||
} | ||
} | ||
}; | ||
// Adding the listener to both futures guarantees that newFuture will aways be set. Adding to | ||
// taskFuture guarantees completion if the callable is invoked, and adding to outputFuture | ||
// propagates cancellation if the callable has not yet been invoked. | ||
outputFuture.addListener(listener, directExecutor()); | ||
taskFuture.addListener(listener, directExecutor()); | ||
|
||
return outputFuture; | ||
} | ||
} |
Oops, something went wrong.