Skip to content

Commit

Permalink
fix: add a timeout on retry for retryable errors (#1930)
Browse files Browse the repository at this point in the history
* fix: add a timeout on retry for retryable errors

* .

* fix clirr

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Update README.md

* Update README.md

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* .

* .

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Jan 13, 2023
1 parent cf08ff8 commit 2d648cf
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 10 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.0'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
```

## Authentication
Expand Down
10 changes: 10 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -76,4 +76,14 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
</difference>
</differences>
Expand Up @@ -31,6 +31,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class ConnectionWorker implements AutoCloseable {
private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
private static Duration maxRetryDuration = Duration.ofMinutes(5);

/*
* The identifier of the current stream to write to. This stream name can change during
Expand Down Expand Up @@ -114,6 +116,9 @@ public class ConnectionWorker implements AutoCloseable {
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

@GuardedBy("lock")
private long connectionRetryStartTime = 0;

/*
* If false, streamConnection needs to be reset.
*/
Expand Down Expand Up @@ -201,6 +206,7 @@ public ConnectionWorker(
ProtoSchema writerSchema,
long maxInflightRequests,
long maxInflightBytes,
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
Expand All @@ -210,6 +216,7 @@ public ConnectionWorker(
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = streamName;
this.maxRetryDuration = maxRetryDuration;
if (writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
Expand Down Expand Up @@ -237,6 +244,7 @@ public void run() {
}

private void resetConnection() {
log.info("Reconnecting for stream:" + streamName);
this.streamConnection =
new StreamConnection(
this.client,
Expand Down Expand Up @@ -618,6 +626,9 @@ private void requestCallback(AppendRowsResponse response) {
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
if (connectionRetryStartTime != 0) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollInflightRequestQueue();
} else if (inflightCleanuped) {
Expand Down Expand Up @@ -686,15 +697,25 @@ private void doneCallback(Throwable finalStatus) {
try {
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
}
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus) && !userClosed) {
if (isRetriableError(finalStatus)
&& !userClosed
&& (maxRetryDuration.toMillis() == 0f
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
log.info(
"Retriable error "
+ finalStatus.toString()
+ " received, retry count "
+ conectionRetryCountWithoutCallback
+ " for stream "
+ ", millis left to retry "
+ (maxRetryDuration.toMillis()
- (System.currentTimeMillis() - connectionRetryStartTime))
+ ", for stream "
+ streamName);
} else {
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
Expand Down
Expand Up @@ -57,6 +57,11 @@ public class ConnectionWorkerPool {
*/
private final long maxInflightBytes;

/*
* Max retry duration for retryable errors.
*/
private final java.time.Duration maxRetryDuration;

/*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
Expand Down Expand Up @@ -196,12 +201,14 @@ public abstract static class Builder {
public ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
Expand Down Expand Up @@ -356,6 +363,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
writeSchema,
maxInflightRequests,
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
traceId,
client,
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -193,6 +194,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
getBigQueryWriteClient(builder),
Expand Down Expand Up @@ -251,6 +253,7 @@ private StreamWriter(Builder builder) throws IOException {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
client,
Expand Down Expand Up @@ -494,6 +497,8 @@ public static final class Builder {

private boolean enableConnectionPool = false;

private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -602,6 +607,15 @@ public Builder setLimitExceededBehavior(
return this;
}

/*
* Max duration to retry on retryable errors. Default is 5 minutes. You can allow unlimited
* retry by setting the value to be 0.
*/
public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
this.maxRetryDuration = maxRetryDuration;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Expand Up @@ -153,7 +153,8 @@ private void testSendRequestsToMultiTable(
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
createConnectionWorkerPool(
maxRequests, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -206,7 +207,8 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
createConnectionWorkerPool(
/*maxRequests=*/ 3, /*maxBytes=*/ 1000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -255,7 +257,8 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
createConnectionWorkerPool(
/*maxRequests=*/ 3, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -368,11 +371,13 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}

ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes) {
ConnectionWorkerPool createConnectionWorkerPool(
long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
ConnectionWorkerPool.enableTestingLogic();
return new ConnectionWorkerPool(
maxRequests,
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
Expand Down
Expand Up @@ -290,16 +290,23 @@ private AppendRowsResponse createAppendResponse(long offset) {

private ConnectionWorker createConnectionWorker() throws IOException {
// By default use only the first table as table reference.
return createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100, 1000);
return createConnectionWorker(
TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
}

private ConnectionWorker createConnectionWorker(
String streamName, String traceId, long maxRequests, long maxBytes) throws IOException {
String streamName,
String traceId,
long maxRequests,
long maxBytes,
java.time.Duration maxRetryDuration)
throws IOException {
return new ConnectionWorker(
streamName,
createProtoSchema("foo"),
maxRequests,
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
Expand Down
Expand Up @@ -91,6 +91,10 @@ public void setTimesToClose(long numberTimesToClose) {
serviceImpl.setTimesToClose(numberTimesToClose);
}

public void setCloseForeverAfter(long closeForeverAfter) {
serviceImpl.setCloseForeverAfter(closeForeverAfter);
}

public long getConnectionCount() {
return serviceImpl.getConnectionCount();
}
Expand Down
Expand Up @@ -57,6 +57,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private long closeAfter = 0;
private long recordCount = 0;
private long connectionCount = 0;
private long closeForeverAfter = 0;

// Record whether the first record has been seen on a connection.
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
Expand Down Expand Up @@ -177,6 +178,9 @@ public void onNext(AppendRowsRequest value) {
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else {
final Response response = responses.get(offset);
sendResponse(response, responseObserver);
Expand Down Expand Up @@ -279,4 +283,10 @@ public void setCloseEveryNAppends(long closeAfter) {
public void setTimesToClose(long numberTimesToClose) {
this.numberTimesToClose = numberTimesToClose;
}

/* The connection will forever return failure after numberTimesToClose. This option shouldn't
* be used together with setCloseEveryNAppends and setTimesToClose*/
public void setCloseForeverAfter(long closeForeverAfter) {
this.closeForeverAfter = closeForeverAfter;
}
}
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
Expand Down Expand Up @@ -129,13 +130,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}

private StreamWriter getTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}

Expand Down Expand Up @@ -884,6 +887,48 @@ public void testAppendWithResetSuccess() throws Exception {
}
}

@Test
public void testAppendWithResetNeverSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
testBigQueryWrite.setCloseForeverAfter(1);
long appendCount = 100;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (long i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
}
// first request succeeded.
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
}
}
}

@Test
public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
try (StreamWriter writer = getMultiplexingTestStreamWriter()) {
testBigQueryWrite.setCloseForeverAfter(1);
long appendCount = 100;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (long i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
}
// first request succeeded.
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
}
}
}

// This test is setup for the server to force a retry after all records are sent. Ensure the
// records are resent, even if no new records are appeneded.
@Test
Expand Down

0 comments on commit 2d648cf

Please sign in to comment.