Skip to content

Commit

Permalink
fix: Allow StreamWriter settings to override passed in BQ client sett…
Browse files Browse the repository at this point in the history
…ing (#2001)

* .

* fix: If client is provided, allow BQ client settings to be overridable

* .

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Feb 23, 2023
1 parent f03ffc7 commit 66db8fe
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 19 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.8.0')
implementation platform('com.google.cloud:libraries-bom:26.9.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.0'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.1"
```

## Authentication
Expand Down
Expand Up @@ -308,17 +308,37 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
return builder.client.getSettings();
settingsBuilder = builder.client.getSettings().toBuilder();
} else {
return BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
.build();
settingsBuilder =
new BigQueryWriteSettings.Builder()
.setTransportChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setChannelsPerCpu(1)
.build())
.setCredentialsProvider(
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build())
.setBackgroundExecutorProvider(
BigQueryWriteSettings.defaultExecutorProviderBuilder().build())
.setEndpoint(BigQueryWriteSettings.getDefaultEndpoint());
}
if (builder.channelProvider != null) {
settingsBuilder.setTransportChannelProvider(builder.channelProvider);
}
if (builder.credentialsProvider != null) {
settingsBuilder.setCredentialsProvider(builder.credentialsProvider);
}
if (builder.executorProvider != null) {
settingsBuilder.setBackgroundExecutorProvider(builder.executorProvider);
}
if (builder.endpoint != null) {
settingsBuilder.setEndpoint(builder.endpoint);
}

return settingsBuilder.build();
}

// Validate whether the fetched connection pool matched certain properties.
Expand Down Expand Up @@ -542,16 +562,13 @@ public static final class Builder {

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;

private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
private String endpoint = null;

private TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
private TransportChannelProvider channelProvider = null;

private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
private CredentialsProvider credentialsProvider = null;

private ExecutorProvider executorProvider =
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
private ExecutorProvider executorProvider = null;

private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;
Expand Down Expand Up @@ -633,7 +650,8 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {

/** {@code ExecutorProvider} to use to create Executor to run background jobs. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
this.executorProvider =
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
return this;
}

Expand Down
Expand Up @@ -25,7 +25,10 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
Expand Down Expand Up @@ -1366,4 +1369,79 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isUserClosed());
}

@Test(timeout = 10000)
public void testBuilderDefaultSetting() throws Exception {
StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(TEST_STREAM_1);
BigQueryWriteSettings writeSettings = StreamWriter.getBigQueryWriteSettings(writerBuilder);
assertEquals(
BigQueryWriteSettings.defaultExecutorProviderBuilder().build().toString(),
writeSettings.getBackgroundExecutorProvider().toString());
assertEquals(
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build().toString(),
writeSettings.getCredentialsProvider().toString());
assertTrue(
writeSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
assertEquals(
BigQueryWriteSettings.getDefaultEndpoint(), writeSettings.getEndpoint().toString());
}

@Test(timeout = 10000)
public void testBuilderExplicitSetting() throws Exception {
// Client has special seetings.
BigQueryWriteSettings clientSettings =
BigQueryWriteSettings.newBuilder()
.setEndpoint("xxx:345")
.setBackgroundExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
BigQueryWriteClient client = BigQueryWriteClient.create(clientSettings);
StreamWriter.Builder writerWithClient = StreamWriter.newBuilder(TEST_STREAM_1, client);
BigQueryWriteSettings writerSettings = StreamWriter.getBigQueryWriteSettings(writerWithClient);
assertEquals("xxx:345", writerSettings.getEndpoint());
assertTrue(
writerSettings.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
assertEquals(
4,
((InstantiatingExecutorProvider) writerSettings.getBackgroundExecutorProvider())
.getExecutorThreadCount());

// Explicit setting on StreamWriter is respected.
StreamWriter.Builder writerWithClientWithOverrides =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setEndpoint("yyy:345")
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build())
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setKeepAliveTimeout(Duration.ofSeconds(500))
.build())
.setCredentialsProvider(
BigQueryWriteSettings.defaultCredentialsProviderBuilder()
.setScopesToApply(Arrays.asList("A", "B"))
.build());
BigQueryWriteSettings writerSettings2 =
StreamWriter.getBigQueryWriteSettings(writerWithClientWithOverrides);
assertEquals("yyy:345", writerSettings2.getEndpoint());
assertTrue(
writerSettings2.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
assertEquals(
14,
((InstantiatingExecutorProvider) writerSettings2.getBackgroundExecutorProvider())
.getExecutorThreadCount());
assertTrue(
writerSettings2.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
assertEquals(
Duration.ofSeconds(500),
((InstantiatingGrpcChannelProvider) writerSettings2.getTransportChannelProvider())
.getKeepAliveTimeout());
assertTrue(writerSettings2.getCredentialsProvider() instanceof GoogleCredentialsProvider);
assertEquals(
2,
((GoogleCredentialsProvider) writerSettings2.getCredentialsProvider())
.getScopesToApply()
.size());
}
}

0 comments on commit 66db8fe

Please sign in to comment.