diff --git a/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java b/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java new file mode 100644 index 000000000000..3c7528f207e7 --- /dev/null +++ b/android/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2018 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.Futures.getDone; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ExecutionSequencer} */ +@RunWith(JUnit4.class) +public class ExecutionSequencerTest { + + ExecutorService executor; + + private ExecutionSequencer serializer; + private SettableFuture firstFuture; + private TestCallable firstCallable; + + @Before + public void setUp() throws Exception { + executor = Executors.newCachedThreadPool(); + serializer = ExecutionSequencer.create(); + firstFuture = SettableFuture.create(); + firstCallable = new TestCallable(firstFuture); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + } + + @Test + public void testCallableStartsAfterFirstFutureCompletes() { + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); + TestCallable secondCallable = new TestCallable(Futures.immediateFuture(null)); + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); + assertThat(firstCallable.called).isTrue(); + assertThat(secondCallable.called).isFalse(); + firstFuture.set(null); + assertThat(secondCallable.called).isTrue(); + } + + @Test + public void testCancellationNotPropagatedIfAlreadyStarted() { + serializer.submitAsync(firstCallable, directExecutor()).cancel(true); + assertThat(firstFuture.isCancelled()).isFalse(); + } + + @Test + public void testCancellationDoesNotViolateSerialization() { + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); + TestCallable secondCallable = new TestCallable(Futures.immediateFuture(null)); + ListenableFuture secondFuture = serializer.submitAsync(secondCallable, directExecutor()); + TestCallable thirdCallable = new TestCallable(Futures.immediateFuture(null)); + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); + secondFuture.cancel(true); + assertThat(secondCallable.called).isFalse(); + assertThat(thirdCallable.called).isFalse(); + firstFuture.set(null); + assertThat(secondCallable.called).isFalse(); + assertThat(thirdCallable.called).isTrue(); + } + + @Test + public void testCancellationMultipleThreads() throws Exception { + final BlockingCallable blockingCallable = new BlockingCallable(); + ListenableFuture unused = serializer.submit(blockingCallable, executor); + ListenableFuture future2 = + serializer.submit( + new Callable() { + @Override + public Boolean call() { + return blockingCallable.isRunning(); + } + }, + directExecutor()); + + // Wait for the first task to be started in the background. It will block until we explicitly + // stop it. + blockingCallable.waitForStart(); + + // Give the second task a chance to (incorrectly) start up while the first task is running. + assertThat(future2.isDone()).isFalse(); + + // Stop the first task. The second task should then run. + blockingCallable.stop(); + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + assertThat(getDone(future2)).isFalse(); + } + + @Test + public void secondTaskWaitsForFirstEvenIfCancelled() throws Exception { + final BlockingCallable blockingCallable = new BlockingCallable(); + ListenableFuture future1 = serializer.submit(blockingCallable, executor); + ListenableFuture future2 = + serializer.submit( + new Callable() { + @Override + public Boolean call() { + return blockingCallable.isRunning(); + } + }, + directExecutor()); + + // Wait for the first task to be started in the background. It will block until we explicitly + // stop it. + blockingCallable.waitForStart(); + + // This time, cancel the future for the first task. The task remains running, only the future + // is cancelled. + future1.cancel(false); + + // Give the second task a chance to (incorrectly) start up while the first task is running. + // (This is the assertion that fails.) + assertThat(future2.isDone()).isFalse(); + + // Stop the first task. The second task should then run. + blockingCallable.stop(); + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + assertThat(getDone(future2)).isFalse(); + } + + private static class BlockingCallable implements Callable { + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch stopLatch = new CountDownLatch(1); + + private volatile boolean running = false; + + @Override + public Void call() throws InterruptedException { + running = true; + startLatch.countDown(); + stopLatch.await(); + running = false; + return null; + } + + public void waitForStart() throws InterruptedException { + startLatch.await(); + } + + public void stop() { + stopLatch.countDown(); + } + + public boolean isRunning() { + return running; + } + } + + private static final class TestCallable implements AsyncCallable { + + private final ListenableFuture future; + private boolean called = false; + + private TestCallable(ListenableFuture future) { + this.future = future; + } + + @Override + public ListenableFuture call() throws Exception { + called = true; + return future; + } + } +} diff --git a/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java b/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java new file mode 100644 index 000000000000..3df29c8f72d0 --- /dev/null +++ b/android/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2018 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; +import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.common.annotations.Beta; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Serializes execution of a set of operations. This class guarantees that a submitted callable will + * not be called before previously submitted callables (and any {@code Future}s returned from them) + * have completed. + * + *

This class implements a superset of the behavior of {@link + * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and + * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead. + * + * @since NEXT + */ +@Beta +public final class ExecutionSequencer { + + private ExecutionSequencer() {} + + /** Creates a new instance. */ + public static ExecutionSequencer create() { + return new ExecutionSequencer(); + } + + enum RunningState { + NOT_RUN, + CANCELLED, + STARTED, + } + + /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ + private final AtomicReference> ref = + new AtomicReference<>(immediateFuture(null)); + + /** + * Enqueues a task to run when the previous task (if any) completes. + * + *

Cancellation does not propagate from the output future to a callable that has begun to + * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, + * {@link Callable#call()} will not be invoked. + */ + public ListenableFuture submit(final Callable callable, Executor executor) { + checkNotNull(callable); + return submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() throws Exception { + return immediateFuture(callable.call()); + } + }, + executor); + } + + /** + * Enqueues a task to run when the previous task (if any) completes. + * + *

Cancellation does not propagate from the output future to the future returned from {@code + * callable} or a callable that has begun to execute, but if the output future is cancelled before + * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. + */ + public ListenableFuture submitAsync( + final AsyncCallable callable, final Executor executor) { + checkNotNull(callable); + final AtomicReference runningState = new AtomicReference<>(NOT_RUN); + final AsyncCallable task = + new AsyncCallable() { + @Override + public ListenableFuture call() throws Exception { + if (!runningState.compareAndSet(NOT_RUN, STARTED)) { + return immediateCancelledFuture(); + } + return callable.call(); + } + }; + /* + * Four futures are at play here: + * taskFuture is the future tracking the result of the callable. + * newFuture is a future that completes after this and all prior tasks are done. + * oldFuture is the previous task's newFuture. + * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. + * + * newFuture is guaranteed to only complete once all tasks previously submitted to this instance + * have completed - namely after oldFuture is done, and taskFuture has either completed or been + * cancelled before the callable started execution. + */ + final SettableFuture newFuture = SettableFuture.create(); + + final ListenableFuture oldFuture = ref.getAndSet(newFuture); + + // Invoke our task once the previous future completes. + final ListenableFuture taskFuture = + Futures.submitAsync( + task, + new Executor() { + @Override + public void execute(Runnable runnable) { + oldFuture.addListener(runnable, executor); + } + }); + + final ListenableFuture outputFuture = Futures.nonCancellationPropagating(taskFuture); + + // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture + // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that + // if the future we return is cancelled, we don't begin execution of the next task until after + // oldFuture completes. + Runnable listener = + new Runnable() { + @Override + public void run() { + if (taskFuture.isDone() + // If this CAS succeeds, we know that the provided callable will never be invoked, + // so when oldFuture completes it is safe to allow the next submitted task to + // proceed. + || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { + // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of + // a future that eventually came from immediateFuture(null), this doesn't leak + // throwables or completion values. + newFuture.setFuture(oldFuture); + } + } + }; + // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to + // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture + // propagates cancellation if the callable has not yet been invoked. + outputFuture.addListener(listener, directExecutor()); + taskFuture.addListener(listener, directExecutor()); + + return outputFuture; + } +} diff --git a/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java b/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java new file mode 100644 index 000000000000..3c7528f207e7 --- /dev/null +++ b/guava-tests/test/com/google/common/util/concurrent/ExecutionSequencerTest.java @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2018 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.Futures.getDone; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ExecutionSequencer} */ +@RunWith(JUnit4.class) +public class ExecutionSequencerTest { + + ExecutorService executor; + + private ExecutionSequencer serializer; + private SettableFuture firstFuture; + private TestCallable firstCallable; + + @Before + public void setUp() throws Exception { + executor = Executors.newCachedThreadPool(); + serializer = ExecutionSequencer.create(); + firstFuture = SettableFuture.create(); + firstCallable = new TestCallable(firstFuture); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + } + + @Test + public void testCallableStartsAfterFirstFutureCompletes() { + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); + TestCallable secondCallable = new TestCallable(Futures.immediateFuture(null)); + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); + assertThat(firstCallable.called).isTrue(); + assertThat(secondCallable.called).isFalse(); + firstFuture.set(null); + assertThat(secondCallable.called).isTrue(); + } + + @Test + public void testCancellationNotPropagatedIfAlreadyStarted() { + serializer.submitAsync(firstCallable, directExecutor()).cancel(true); + assertThat(firstFuture.isCancelled()).isFalse(); + } + + @Test + public void testCancellationDoesNotViolateSerialization() { + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); + TestCallable secondCallable = new TestCallable(Futures.immediateFuture(null)); + ListenableFuture secondFuture = serializer.submitAsync(secondCallable, directExecutor()); + TestCallable thirdCallable = new TestCallable(Futures.immediateFuture(null)); + @SuppressWarnings({"unused", "nullness"}) + Future possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); + secondFuture.cancel(true); + assertThat(secondCallable.called).isFalse(); + assertThat(thirdCallable.called).isFalse(); + firstFuture.set(null); + assertThat(secondCallable.called).isFalse(); + assertThat(thirdCallable.called).isTrue(); + } + + @Test + public void testCancellationMultipleThreads() throws Exception { + final BlockingCallable blockingCallable = new BlockingCallable(); + ListenableFuture unused = serializer.submit(blockingCallable, executor); + ListenableFuture future2 = + serializer.submit( + new Callable() { + @Override + public Boolean call() { + return blockingCallable.isRunning(); + } + }, + directExecutor()); + + // Wait for the first task to be started in the background. It will block until we explicitly + // stop it. + blockingCallable.waitForStart(); + + // Give the second task a chance to (incorrectly) start up while the first task is running. + assertThat(future2.isDone()).isFalse(); + + // Stop the first task. The second task should then run. + blockingCallable.stop(); + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + assertThat(getDone(future2)).isFalse(); + } + + @Test + public void secondTaskWaitsForFirstEvenIfCancelled() throws Exception { + final BlockingCallable blockingCallable = new BlockingCallable(); + ListenableFuture future1 = serializer.submit(blockingCallable, executor); + ListenableFuture future2 = + serializer.submit( + new Callable() { + @Override + public Boolean call() { + return blockingCallable.isRunning(); + } + }, + directExecutor()); + + // Wait for the first task to be started in the background. It will block until we explicitly + // stop it. + blockingCallable.waitForStart(); + + // This time, cancel the future for the first task. The task remains running, only the future + // is cancelled. + future1.cancel(false); + + // Give the second task a chance to (incorrectly) start up while the first task is running. + // (This is the assertion that fails.) + assertThat(future2.isDone()).isFalse(); + + // Stop the first task. The second task should then run. + blockingCallable.stop(); + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + assertThat(getDone(future2)).isFalse(); + } + + private static class BlockingCallable implements Callable { + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch stopLatch = new CountDownLatch(1); + + private volatile boolean running = false; + + @Override + public Void call() throws InterruptedException { + running = true; + startLatch.countDown(); + stopLatch.await(); + running = false; + return null; + } + + public void waitForStart() throws InterruptedException { + startLatch.await(); + } + + public void stop() { + stopLatch.countDown(); + } + + public boolean isRunning() { + return running; + } + } + + private static final class TestCallable implements AsyncCallable { + + private final ListenableFuture future; + private boolean called = false; + + private TestCallable(ListenableFuture future) { + this.future = future; + } + + @Override + public ListenableFuture call() throws Exception { + called = true; + return future; + } + } +} diff --git a/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java b/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java new file mode 100644 index 000000000000..3df29c8f72d0 --- /dev/null +++ b/guava/src/com/google/common/util/concurrent/ExecutionSequencer.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2018 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; +import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; +import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.common.annotations.Beta; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Serializes execution of a set of operations. This class guarantees that a submitted callable will + * not be called before previously submitted callables (and any {@code Future}s returned from them) + * have completed. + * + *

This class implements a superset of the behavior of {@link + * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and + * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead. + * + * @since NEXT + */ +@Beta +public final class ExecutionSequencer { + + private ExecutionSequencer() {} + + /** Creates a new instance. */ + public static ExecutionSequencer create() { + return new ExecutionSequencer(); + } + + enum RunningState { + NOT_RUN, + CANCELLED, + STARTED, + } + + /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ + private final AtomicReference> ref = + new AtomicReference<>(immediateFuture(null)); + + /** + * Enqueues a task to run when the previous task (if any) completes. + * + *

Cancellation does not propagate from the output future to a callable that has begun to + * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, + * {@link Callable#call()} will not be invoked. + */ + public ListenableFuture submit(final Callable callable, Executor executor) { + checkNotNull(callable); + return submitAsync( + new AsyncCallable() { + @Override + public ListenableFuture call() throws Exception { + return immediateFuture(callable.call()); + } + }, + executor); + } + + /** + * Enqueues a task to run when the previous task (if any) completes. + * + *

Cancellation does not propagate from the output future to the future returned from {@code + * callable} or a callable that has begun to execute, but if the output future is cancelled before + * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. + */ + public ListenableFuture submitAsync( + final AsyncCallable callable, final Executor executor) { + checkNotNull(callable); + final AtomicReference runningState = new AtomicReference<>(NOT_RUN); + final AsyncCallable task = + new AsyncCallable() { + @Override + public ListenableFuture call() throws Exception { + if (!runningState.compareAndSet(NOT_RUN, STARTED)) { + return immediateCancelledFuture(); + } + return callable.call(); + } + }; + /* + * Four futures are at play here: + * taskFuture is the future tracking the result of the callable. + * newFuture is a future that completes after this and all prior tasks are done. + * oldFuture is the previous task's newFuture. + * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. + * + * newFuture is guaranteed to only complete once all tasks previously submitted to this instance + * have completed - namely after oldFuture is done, and taskFuture has either completed or been + * cancelled before the callable started execution. + */ + final SettableFuture newFuture = SettableFuture.create(); + + final ListenableFuture oldFuture = ref.getAndSet(newFuture); + + // Invoke our task once the previous future completes. + final ListenableFuture taskFuture = + Futures.submitAsync( + task, + new Executor() { + @Override + public void execute(Runnable runnable) { + oldFuture.addListener(runnable, executor); + } + }); + + final ListenableFuture outputFuture = Futures.nonCancellationPropagating(taskFuture); + + // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture + // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that + // if the future we return is cancelled, we don't begin execution of the next task until after + // oldFuture completes. + Runnable listener = + new Runnable() { + @Override + public void run() { + if (taskFuture.isDone() + // If this CAS succeeds, we know that the provided callable will never be invoked, + // so when oldFuture completes it is safe to allow the next submitted task to + // proceed. + || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { + // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of + // a future that eventually came from immediateFuture(null), this doesn't leak + // throwables or completion values. + newFuture.setFuture(oldFuture); + } + } + }; + // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to + // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture + // propagates cancellation if the callable has not yet been invoked. + outputFuture.addListener(listener, directExecutor()); + taskFuture.addListener(listener, directExecutor()); + + return outputFuture; + } +}