Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress BigQuery read stream splitAtFraction when API busy call or timeout #31125

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
Expand Down Expand Up @@ -192,6 +198,7 @@ public static class BigQueryStorageStreamReader<T> extends BoundedSource.Bounded
"split-at-fraction-calls-failed-due-to-other-reasons");
private final Counter successfulSplitCalls =
Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful");
private static final ExecutorService executor = Executors.newCachedThreadPool();

private BigQueryStorageStreamReader(
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
Expand Down Expand Up @@ -238,8 +245,12 @@ public synchronized boolean advance() throws IOException {
private synchronized boolean readNextRecord() throws IOException {
Iterator<ReadRowsResponse> responseIterator = this.responseIterator;
while (reader.readyForNextReadResponse()) {
// hasNext call has internal retry. Record throttling metrics after called
boolean previous = splitAllowed;
// disallow splitAtFraction (where it also calls hasNext) when iterator busy
splitAllowed = false;
boolean hasNext = responseIterator.hasNext();
splitAllowed = previous;
// hasNext call has internal retry. Record throttling metrics after called
storageClient.reportPendingMetrics();

if (!hasNext) {
Expand Down Expand Up @@ -388,8 +399,22 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
// The following line is required to trigger the `FailedPreconditionException` on which
// the SplitReadStream validation logic depends. Removing it will cause incorrect
// split operations to succeed.
newResponseIterator.hasNext();
storageClient.reportPendingMetrics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed reportPendingMetrics call here otherwise same warning as #31096 seen, because splitAtFraction call thread isn't a work item thread

Future<Boolean> future = executor.submit(newResponseIterator::hasNext);
try {
// The intended wait time is in sync with splitReadStreamSettings.setRetrySettings in
// StorageClientImpl.
future.get(30, TimeUnit.SECONDS);
Copy link
Contributor Author

@Abacn Abacn Apr 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correspond to the config here

splitReadStreamSettings.setRetrySettings(
splitReadStreamSettings
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(org.threeten.bp.Duration.ofSeconds(30))
.setMaxRpcTimeout(org.threeten.bp.Duration.ofSeconds(30))
.setTotalTimeout(org.threeten.bp.Duration.ofSeconds(30))
.build());

It intended to set the API call timeout during split request to 30s (and otherwise reject split). However, a bug fix #21739 added this call here, which has internal retry logic, but this added call is not covered by the split-stream-retry config above. When there is quota issue, this is causing pipeline stuck

} catch (TimeoutException | InterruptedException | ExecutionException e) {
badSplitPointCalls.inc();
LOG.info(
"Split of stream {} abandoned because current position check failed with {}.",
source.readStream.getName(),
e.getClass().getName());

return null;
} finally {
future.cancel(true);
}
} catch (FailedPreconditionException e) {
// The current source has already moved past the split point, so this split attempt
// is unsuccessful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
Expand Down Expand Up @@ -1384,6 +1386,89 @@ public KV<String, Long> apply(SchemaAndRecord input) {
}
}

/**
* A mock response that stuck indefinitely when the returned iterator's hasNext get called,
* intended to simulate server side issue.
*/
static class StuckResponse extends ArrayList<ReadRowsResponse> {

private CountDownLatch latch;

public StuckResponse(CountDownLatch latch) {
this.latch = latch;
}

@Override
public Iterator<ReadRowsResponse> iterator() {
return new StuckIterator();
}

private class StuckIterator implements Iterator<ReadRowsResponse> {

@Override
public boolean hasNext() {
latch.countDown();
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}

@Override
public ReadRowsResponse next() {
return null;
}
}
}

@Test
public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Exception {
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();

ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder().setReadStream("readStream").build();

CountDownLatch latch = new CountDownLatch(1);
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(new StuckResponse(latch)));

BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
ReadStream.newBuilder().setName("readStream").build(),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));

BigQueryStorageStreamReader<TableRow> reader = streamSource.createReader(options);

Thread t =
new Thread(
() -> {
try {
reader.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
t.start();

// wait until thread proceed
latch.await();

// SplitAfFraction rejected while response iterator busy
assertNull(reader.splitAtFraction(0.5));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the changes in main scope code, the test fails with

java.lang.NullPointerException
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource$BigQueryStorageStreamReader.splitAtFraction(BigQueryStorageStreamSource.java:372)

because it passed "splitAllowed" check

t.interrupt();
}

@Test
public void testReadFromBigQueryIO() throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
Expand Down