Skip to content

Commit

Permalink
fix: fix WriteToDefaultStream example code to close the client proper…
Browse files Browse the repository at this point in the history
…ly (#2433)

* fix: fix channel not shut down properly exception.

Client being created has to be properly closed, otherwise during garbage
collection an error will be reported showing channel not shutdown
properly

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Mar 7, 2024
1 parent f8535be commit d8a52f8
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ private static class DataWriter {

private static final int MAX_RECREATE_COUNT = 3;

private BigQueryWriteClient client;

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
Expand All @@ -163,12 +165,16 @@ public void initialize(TableName parentTable)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client = BigQueryWriteClient.create();

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(parentTable.toString(), BigQueryWriteClient.create())
JsonStreamWriter.newBuilder(parentTable.toString(), client)
.setExecutorProvider(
FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
Expand All @@ -193,10 +199,7 @@ public void append(AppendContext appendContext)
if (!streamWriter.isUserClosed()
&& streamWriter.isClosed()
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter =
JsonStreamWriter.newBuilder(
streamWriter.getStreamName(), BigQueryWriteClient.create())
.build();
streamWriter = JsonStreamWriter.newBuilder(streamWriter.getStreamName(), client).build();
this.error = null;
}
// If earlier appends have failed, we need to reset before continuing.
Expand All @@ -217,6 +220,7 @@ public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();

client.close();
// Close the connection to the server.
streamWriter.close();

Expand Down

0 comments on commit d8a52f8

Please sign in to comment.