Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: a rare race condition in the row merger #1939

Merged
merged 2 commits into from Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -277,7 +277,7 @@ private void deliverUnsafe() {
// Optimization: the inner loop will eager process any accumulated state, so reset the lock
// for just this iteration. (If another event occurs during processing, it can increment the
// lock to enqueue another iteration).
lock.lazySet(1);
lock.set(1);

// Process the upstream message if one exists.
pollUpstream();
Expand Down
Expand Up @@ -15,9 +15,12 @@
*/
package com.google.cloud.bigtable.gaxx.reframing;

import static com.google.common.truth.Truth.assertWithMessage;

import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable;
Expand All @@ -27,9 +30,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -431,6 +438,120 @@ public String pop() {
Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError);
}

/**
* Test race between a request() and onComplete (b/295866356). This will stress the concurrency
* primitives in deliver() by running a many iterations across many threads. Some race conditions
* are very subtle and are very rare, so bugs in the implementation would present themselves as
* flakes in this test. All flakes of this test should be investigated as a failure.
*/
@Test
public void testRequestAndCompleteRaceCondition() throws Throwable {
int concurrency = 20;
int iterations = 20_000;

ExecutorService executor = Executors.newFixedThreadPool(concurrency);

List<Future<?>> results = new ArrayList<>();

for (int i = 0; i < concurrency; i++) {
Future<?> result =
executor.submit(
(Callable<Void>)
() -> {
for (int j = 0; j < iterations; j++) {
requestAndCompleteRaceConditionIteration();
}
return null;
});
results.add(result);
}

executor.shutdown();

for (Future<?> result : results) {
try {
result.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}
}

private static void requestAndCompleteRaceConditionIteration()
throws InterruptedException, ExecutionException {
MockStreamingApi.MockResponseObserver<String> observer =
new MockStreamingApi.MockResponseObserver<>(false);
ReframingResponseObserver<String, String> underTest =
new ReframingResponseObserver<>(
observer, new ReframingResponseObserverTest.DasherizingReframer(1));

// This is intentionally not a Phaser, the Phaser seems to drastically reduce the reproduction
// rate of the
// original race condition.
CountDownLatch readySignal = new CountDownLatch(2);
CompletableFuture<Void> startSignal = new CompletableFuture<>();

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<Void> f1 =
executor.submit(
() -> {
// no setup, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
underTest.onComplete();
// Race end

return null;
});

Future<Void> f2 =
executor.submit(
() -> {
// Setup before race - simulate that the ServerStream iterator got one row and is now
// checking if there
// is another. This is the lead up to the race with grpc's onComplete
underTest.onStart(
new StreamController() {
@Override
public void cancel() {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int count) {}
});
observer.getController().request(1);
underTest.onResponse("moo");

// Setup complete, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
observer.getController().request(1);
// Race end

return null;
});
executor.shutdown();

// Wait for worker setup
readySignal.await();
// Tell workers to race
startSignal.complete(null);

// Wait workers to finish
f1.get();
f2.get();

// the outer observer should be told of the completion of rpc
assertWithMessage("outer observer should not hang").that(observer.isDone()).isTrue();
}

/**
* A simple implementation of a {@link Reframer}. The input string is split by dash, and the
* output is concatenated by dashes. The test can verify M:N behavior by adjusting the
Expand Down