diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 8bd384c325..0266b6ae9d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; @@ -123,6 +124,7 @@ private static class AppendContext { private static class DataWriter { private static final int MAX_RETRY_COUNT = 3; + private static final int MAX_RECREATE_COUNT = 3; private static final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of( Code.INTERNAL, @@ -140,6 +142,8 @@ private static class DataWriter { @GuardedBy("lock") private RuntimeException error = null; + private AtomicInteger recreateCount = new AtomicInteger(0); + public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write @@ -151,8 +155,17 @@ public void initialize(TableName parentTable) } public void append(AppendContext appendContext) - throws DescriptorValidationException, IOException { + throws DescriptorValidationException, IOException, InterruptedException { synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = + JsonStreamWriter.newBuilder( + streamWriter.getStreamName(), BigQueryWriteClient.create()) + .build(); + this.error = null; + } // If earlier appends have failed, we need to reset before continuing. if (this.error != null) { throw this.error; @@ -194,6 +207,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { public void onSuccess(AppendRowsResponse response) { System.out.format("Append success\n"); + this.parent.recreateCount.set(0); done(); } @@ -241,6 +255,8 @@ public void onFailure(Throwable throwable) { throw new RuntimeException(e); } catch (IOException e) { throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } // Mark the existing attempt as done since we got a response for it