Skip to content

Commit

Permalink
Move ExecutionSequencer into guava as @beta.
Browse files Browse the repository at this point in the history
RELNOTES=Added `ExecutionSequencer`, a new utility to run a series of asynchronous operations serially.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=206609150
  • Loading branch information
clm authored and ronshapiro committed Jul 31, 2018
1 parent f1249c4 commit 8d1fbc9
Show file tree
Hide file tree
Showing 4 changed files with 706 additions and 0 deletions.
@@ -0,0 +1,196 @@
/*
* Copyright (C) 2018 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.util.concurrent;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link ExecutionSequencer} */
@RunWith(JUnit4.class)
public class ExecutionSequencerTest {

ExecutorService executor;

private ExecutionSequencer serializer;
private SettableFuture<Void> firstFuture;
private TestCallable firstCallable;

@Before
public void setUp() throws Exception {
executor = Executors.newCachedThreadPool();
serializer = ExecutionSequencer.create();
firstFuture = SettableFuture.create();
firstCallable = new TestCallable(firstFuture);
}

@After
public void tearDown() throws Exception {
executor.shutdown();
}

@Test
public void testCallableStartsAfterFirstFutureCompletes() {
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
assertThat(firstCallable.called).isTrue();
assertThat(secondCallable.called).isFalse();
firstFuture.set(null);
assertThat(secondCallable.called).isTrue();
}

@Test
public void testCancellationNotPropagatedIfAlreadyStarted() {
serializer.submitAsync(firstCallable, directExecutor()).cancel(true);
assertThat(firstFuture.isCancelled()).isFalse();
}

@Test
public void testCancellationDoesNotViolateSerialization() {
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
secondFuture.cancel(true);
assertThat(secondCallable.called).isFalse();
assertThat(thirdCallable.called).isFalse();
firstFuture.set(null);
assertThat(secondCallable.called).isFalse();
assertThat(thirdCallable.called).isTrue();
}

@Test
public void testCancellationMultipleThreads() throws Exception {
final BlockingCallable blockingCallable = new BlockingCallable();
ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
ListenableFuture<Boolean> future2 =
serializer.submit(
new Callable<Boolean>() {
@Override
public Boolean call() {
return blockingCallable.isRunning();
}
},
directExecutor());

// Wait for the first task to be started in the background. It will block until we explicitly
// stop it.
blockingCallable.waitForStart();

// Give the second task a chance to (incorrectly) start up while the first task is running.
assertThat(future2.isDone()).isFalse();

// Stop the first task. The second task should then run.
blockingCallable.stop();
executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(getDone(future2)).isFalse();
}

@Test
public void secondTaskWaitsForFirstEvenIfCancelled() throws Exception {
final BlockingCallable blockingCallable = new BlockingCallable();
ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
ListenableFuture<Boolean> future2 =
serializer.submit(
new Callable<Boolean>() {
@Override
public Boolean call() {
return blockingCallable.isRunning();
}
},
directExecutor());

// Wait for the first task to be started in the background. It will block until we explicitly
// stop it.
blockingCallable.waitForStart();

// This time, cancel the future for the first task. The task remains running, only the future
// is cancelled.
future1.cancel(false);

// Give the second task a chance to (incorrectly) start up while the first task is running.
// (This is the assertion that fails.)
assertThat(future2.isDone()).isFalse();

// Stop the first task. The second task should then run.
blockingCallable.stop();
executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(getDone(future2)).isFalse();
}

private static class BlockingCallable implements Callable<Void> {
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);

private volatile boolean running = false;

@Override
public Void call() throws InterruptedException {
running = true;
startLatch.countDown();
stopLatch.await();
running = false;
return null;
}

public void waitForStart() throws InterruptedException {
startLatch.await();
}

public void stop() {
stopLatch.countDown();
}

public boolean isRunning() {
return running;
}
}

private static final class TestCallable implements AsyncCallable<Void> {

private final ListenableFuture<Void> future;
private boolean called = false;

private TestCallable(ListenableFuture<Void> future) {
this.future = future;
}

@Override
public ListenableFuture<Void> call() throws Exception {
called = true;
return future;
}
}
}
@@ -0,0 +1,157 @@
/*
* Copyright (C) 2018 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.annotations.Beta;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

/**
* Serializes execution of a set of operations. This class guarantees that a submitted callable will
* not be called before previously submitted callables (and any {@code Future}s returned from them)
* have completed.
*
* <p>This class implements a superset of the behavior of {@link
* MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and
* don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead.
*
* @since NEXT
*/
@Beta
public final class ExecutionSequencer {

private ExecutionSequencer() {}

/** Creates a new instance. */
public static ExecutionSequencer create() {
return new ExecutionSequencer();
}

enum RunningState {
NOT_RUN,
CANCELLED,
STARTED,
}

/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
private final AtomicReference<ListenableFuture<Object>> ref =
new AtomicReference<>(immediateFuture(null));

/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to a callable that has begun to
* execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
* {@link Callable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
checkNotNull(callable);
return submitAsync(
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
return immediateFuture(callable.call());
}
},
executor);
}

/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to the future returned from {@code
* callable} or a callable that has begun to execute, but if the output future is cancelled before
* {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submitAsync(
final AsyncCallable<T> callable, final Executor executor) {
checkNotNull(callable);
final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN);
final AsyncCallable<T> task =
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
if (!runningState.compareAndSet(NOT_RUN, STARTED)) {
return immediateCancelledFuture();
}
return callable.call();
}
};
/*
* Four futures are at play here:
* taskFuture is the future tracking the result of the callable.
* newFuture is a future that completes after this and all prior tasks are done.
* oldFuture is the previous task's newFuture.
* outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
*
* newFuture is guaranteed to only complete once all tasks previously submitted to this instance
* have completed - namely after oldFuture is done, and taskFuture has either completed or been
* cancelled before the callable started execution.
*/
final SettableFuture<Object> newFuture = SettableFuture.create();

final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);

// Invoke our task once the previous future completes.
final ListenableFuture<T> taskFuture =
Futures.submitAsync(
task,
new Executor() {
@Override
public void execute(Runnable runnable) {
oldFuture.addListener(runnable, executor);
}
});

final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);

// newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
// unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
// if the future we return is cancelled, we don't begin execution of the next task until after
// oldFuture completes.
Runnable listener =
new Runnable() {
@Override
public void run() {
if (taskFuture.isDone()
// If this CAS succeeds, we know that the provided callable will never be invoked,
// so when oldFuture completes it is safe to allow the next submitted task to
// proceed.
|| (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) {
// Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
// a future that eventually came from immediateFuture(null), this doesn't leak
// throwables or completion values.
newFuture.setFuture(oldFuture);
}
}
};
// Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
// taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
// propagates cancellation if the callable has not yet been invoked.
outputFuture.addListener(listener, directExecutor());
taskFuture.addListener(listener, directExecutor());

return outputFuture;
}
}

0 comments on commit 8d1fbc9

Please sign in to comment.