Skip to content

Commit

Permalink
Suppress BigQuery read stream splitAtFraction when API busy call or t…
Browse files Browse the repository at this point in the history
…imeout (#31125)

* Reject split attempt early when there is an ongoing hasNext call
  in work item thread

* TImeout hasNext call in splitAtFraction in alignment with client
  splitReadStreamSettings
  • Loading branch information
Abacn committed May 6, 2024
1 parent a22ee68 commit 26c4ee4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
Expand Down Expand Up @@ -192,6 +198,7 @@ public static class BigQueryStorageStreamReader<T> extends BoundedSource.Bounded
"split-at-fraction-calls-failed-due-to-other-reasons");
private final Counter successfulSplitCalls =
Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful");
private static final ExecutorService executor = Executors.newCachedThreadPool();

private BigQueryStorageStreamReader(
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
Expand Down Expand Up @@ -238,8 +245,12 @@ public synchronized boolean advance() throws IOException {
private synchronized boolean readNextRecord() throws IOException {
Iterator<ReadRowsResponse> responseIterator = this.responseIterator;
while (reader.readyForNextReadResponse()) {
// hasNext call has internal retry. Record throttling metrics after called
boolean previous = splitAllowed;
// disallow splitAtFraction (where it also calls hasNext) when iterator busy
splitAllowed = false;
boolean hasNext = responseIterator.hasNext();
splitAllowed = previous;
// hasNext call has internal retry. Record throttling metrics after called
storageClient.reportPendingMetrics();

if (!hasNext) {
Expand Down Expand Up @@ -388,8 +399,22 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
// The following line is required to trigger the `FailedPreconditionException` on which
// the SplitReadStream validation logic depends. Removing it will cause incorrect
// split operations to succeed.
newResponseIterator.hasNext();
storageClient.reportPendingMetrics();
Future<Boolean> future = executor.submit(newResponseIterator::hasNext);
try {
// The intended wait time is in sync with splitReadStreamSettings.setRetrySettings in
// StorageClientImpl.
future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
badSplitPointCalls.inc();
LOG.info(
"Split of stream {} abandoned because current position check failed with {}.",
source.readStream.getName(),
e.getClass().getName());

return null;
} finally {
future.cancel(true);
}
} catch (FailedPreconditionException e) {
// The current source has already moved past the split point, so this split attempt
// is unsuccessful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
Expand Down Expand Up @@ -1384,6 +1386,89 @@ public KV<String, Long> apply(SchemaAndRecord input) {
}
}

/**
* A mock response that stuck indefinitely when the returned iterator's hasNext get called,
* intended to simulate server side issue.
*/
static class StuckResponse extends ArrayList<ReadRowsResponse> {

private CountDownLatch latch;

public StuckResponse(CountDownLatch latch) {
this.latch = latch;
}

@Override
public Iterator<ReadRowsResponse> iterator() {
return new StuckIterator();
}

private class StuckIterator implements Iterator<ReadRowsResponse> {

@Override
public boolean hasNext() {
latch.countDown();
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}

@Override
public ReadRowsResponse next() {
return null;
}
}
}

@Test
public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Exception {
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();

ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder().setReadStream("readStream").build();

CountDownLatch latch = new CountDownLatch(1);
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(new StuckResponse(latch)));

BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
ReadStream.newBuilder().setName("readStream").build(),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));

BigQueryStorageStreamReader<TableRow> reader = streamSource.createReader(options);

Thread t =
new Thread(
() -> {
try {
reader.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
t.start();

// wait until thread proceed
latch.await();

// SplitAfFraction rejected while response iterator busy
assertNull(reader.splitAtFraction(0.5));
t.interrupt();
}

@Test
public void testReadFromBigQueryIO() throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
Expand Down

0 comments on commit 26c4ee4

Please sign in to comment.