Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add public api to stream writer to set the maximum wait time #2066

Merged
merged 5 commits into from Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.1'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.2'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.2"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -219,7 +219,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.1
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.2
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Expand Up @@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable {
* We will constantly checking how much time we have been waiting for the next request callback
* if we wait too much time we will start shutting down the connections and clean up the queues.
*/
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);

private Lock lock;
private Condition hasMessageInWaitingQueue;
Expand Down
Expand Up @@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() {
: null;
}

/**
* Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very
* low chance, it's possible for append request to be waiting indefintely for request callback
* when Google networking SDK does not detect the networking breakage. The default timeout is 15
* minutes. We are investigating the root cause for callback not triggered by networking SDK.
*/
public static void setMaxRequestCallbackWaitTime(Duration waitTime) {
ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
}

long getCreationTimestamp() {
return creationTimestamp;
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {}
@Before
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000));
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
serviceHelper =
new MockServiceHelper(
Expand Down Expand Up @@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}

@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1));
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
int finalI = i;
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
}
}

@Test
public void testAppendWithResetSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
Expand Down