-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Suppress BigQuery read stream splitAtFraction when API busy call or timeout #31125
Changes from all commits
9b1b009
6169dd1
44b33fd
409556e
4b3dd6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -35,6 +35,12 @@ | |||||||||||||||||
import java.util.Iterator; | ||||||||||||||||||
import java.util.List; | ||||||||||||||||||
import java.util.NoSuchElementException; | ||||||||||||||||||
import java.util.concurrent.ExecutionException; | ||||||||||||||||||
import java.util.concurrent.ExecutorService; | ||||||||||||||||||
import java.util.concurrent.Executors; | ||||||||||||||||||
import java.util.concurrent.Future; | ||||||||||||||||||
import java.util.concurrent.TimeUnit; | ||||||||||||||||||
import java.util.concurrent.TimeoutException; | ||||||||||||||||||
import org.apache.beam.runners.core.metrics.ServiceCallMetric; | ||||||||||||||||||
import org.apache.beam.sdk.coders.Coder; | ||||||||||||||||||
import org.apache.beam.sdk.io.BoundedSource; | ||||||||||||||||||
|
@@ -192,6 +198,7 @@ public static class BigQueryStorageStreamReader<T> extends BoundedSource.Bounded | |||||||||||||||||
"split-at-fraction-calls-failed-due-to-other-reasons"); | ||||||||||||||||||
private final Counter successfulSplitCalls = | ||||||||||||||||||
Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful"); | ||||||||||||||||||
private static final ExecutorService executor = Executors.newCachedThreadPool(); | ||||||||||||||||||
|
||||||||||||||||||
private BigQueryStorageStreamReader( | ||||||||||||||||||
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException { | ||||||||||||||||||
|
@@ -238,8 +245,12 @@ public synchronized boolean advance() throws IOException { | |||||||||||||||||
private synchronized boolean readNextRecord() throws IOException { | ||||||||||||||||||
Iterator<ReadRowsResponse> responseIterator = this.responseIterator; | ||||||||||||||||||
while (reader.readyForNextReadResponse()) { | ||||||||||||||||||
// hasNext call has internal retry. Record throttling metrics after called | ||||||||||||||||||
boolean previous = splitAllowed; | ||||||||||||||||||
// disallow splitAtFraction (where it also calls hasNext) when iterator busy | ||||||||||||||||||
splitAllowed = false; | ||||||||||||||||||
boolean hasNext = responseIterator.hasNext(); | ||||||||||||||||||
splitAllowed = previous; | ||||||||||||||||||
// hasNext call has internal retry. Record throttling metrics after called | ||||||||||||||||||
storageClient.reportPendingMetrics(); | ||||||||||||||||||
|
||||||||||||||||||
if (!hasNext) { | ||||||||||||||||||
|
@@ -388,8 +399,22 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() { | |||||||||||||||||
// The following line is required to trigger the `FailedPreconditionException` on which | ||||||||||||||||||
// the SplitReadStream validation logic depends. Removing it will cause incorrect | ||||||||||||||||||
// split operations to succeed. | ||||||||||||||||||
newResponseIterator.hasNext(); | ||||||||||||||||||
storageClient.reportPendingMetrics(); | ||||||||||||||||||
Future<Boolean> future = executor.submit(newResponseIterator::hasNext); | ||||||||||||||||||
try { | ||||||||||||||||||
// The intended wait time is in sync with splitReadStreamSettings.setRetrySettings in | ||||||||||||||||||
// StorageClientImpl. | ||||||||||||||||||
future.get(30, TimeUnit.SECONDS); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is correspond to the config here Lines 1735 to 1742 in 673da54
It intended to set the API call timeout during split request to 30s (and otherwise reject split). However, a bug fix #21739 added this call here, which has internal retry logic, but this added call is not covered by the split-stream-retry config above. When there is quota issue, this is causing pipeline stuck |
||||||||||||||||||
} catch (TimeoutException | InterruptedException | ExecutionException e) { | ||||||||||||||||||
badSplitPointCalls.inc(); | ||||||||||||||||||
LOG.info( | ||||||||||||||||||
"Split of stream {} abandoned because current position check failed with {}.", | ||||||||||||||||||
source.readStream.getName(), | ||||||||||||||||||
e.getClass().getName()); | ||||||||||||||||||
|
||||||||||||||||||
return null; | ||||||||||||||||||
} finally { | ||||||||||||||||||
future.cancel(true); | ||||||||||||||||||
} | ||||||||||||||||||
} catch (FailedPreconditionException e) { | ||||||||||||||||||
// The current source has already moved past the split point, so this split attempt | ||||||||||||||||||
// is unsuccessful. | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,9 @@ | |
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import org.apache.arrow.memory.BufferAllocator; | ||
import org.apache.arrow.memory.RootAllocator; | ||
import org.apache.arrow.vector.BigIntVector; | ||
|
@@ -1384,6 +1386,89 @@ public KV<String, Long> apply(SchemaAndRecord input) { | |
} | ||
} | ||
|
||
/** | ||
* A mock response that stuck indefinitely when the returned iterator's hasNext get called, | ||
* intended to simulate server side issue. | ||
*/ | ||
static class StuckResponse extends ArrayList<ReadRowsResponse> { | ||
|
||
private CountDownLatch latch; | ||
|
||
public StuckResponse(CountDownLatch latch) { | ||
this.latch = latch; | ||
} | ||
|
||
@Override | ||
public Iterator<ReadRowsResponse> iterator() { | ||
return new StuckIterator(); | ||
} | ||
|
||
private class StuckIterator implements Iterator<ReadRowsResponse> { | ||
|
||
@Override | ||
public boolean hasNext() { | ||
latch.countDown(); | ||
try { | ||
Thread.sleep(Long.MAX_VALUE); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public ReadRowsResponse next() { | ||
return null; | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Exception { | ||
ReadSession readSession = | ||
ReadSession.newBuilder() | ||
.setName("readSession") | ||
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) | ||
.build(); | ||
|
||
ReadRowsRequest expectedRequest = | ||
ReadRowsRequest.newBuilder().setReadStream("readStream").build(); | ||
|
||
CountDownLatch latch = new CountDownLatch(1); | ||
StorageClient fakeStorageClient = mock(StorageClient.class); | ||
when(fakeStorageClient.readRows(expectedRequest, "")) | ||
.thenReturn(new FakeBigQueryServerStream<>(new StuckResponse(latch))); | ||
|
||
BigQueryStorageStreamSource<TableRow> streamSource = | ||
BigQueryStorageStreamSource.create( | ||
readSession, | ||
ReadStream.newBuilder().setName("readStream").build(), | ||
TABLE_SCHEMA, | ||
new TableRowParser(), | ||
TableRowJsonCoder.of(), | ||
new FakeBigQueryServices().withStorageClient(fakeStorageClient)); | ||
|
||
BigQueryStorageStreamReader<TableRow> reader = streamSource.createReader(options); | ||
|
||
Thread t = | ||
new Thread( | ||
() -> { | ||
try { | ||
reader.start(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
t.start(); | ||
|
||
// wait until thread proceed | ||
latch.await(); | ||
|
||
// SplitAfFraction rejected while response iterator busy | ||
assertNull(reader.splitAtFraction(0.5)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. without the changes in main scope code, the test fails with
because it passed "splitAllowed" check |
||
t.interrupt(); | ||
} | ||
|
||
@Test | ||
public void testReadFromBigQueryIO() throws Exception { | ||
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed reportPendingMetrics call here otherwise same warning as #31096 seen, because splitAtFraction call thread isn't a work item thread