diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index b3b2c19199..0060ad7314 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -378,6 +378,11 @@ public String getWriterId() { return writerId; } + boolean isConnectionInUnrecoverableState() { + // If final status is set, there's no + return connectionFinalStatus != null; + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 0e6b5eab3a..c4e68bb189 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -234,9 +234,17 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, streamWriter, (key, existingStream) -> { // Stick to the existing stream if it's not overwhelmed. - if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { + if (existingStream != null + && !existingStream.getLoad().isOverwhelmed() + && !existingStream.isConnectionInUnrecoverableState()) { return existingStream; } + if (existingStream != null && existingStream.isConnectionInUnrecoverableState()) { + existingStream = null; + } + // Before search for the next connection to attach, clear the finalized connections + // first so that they will not be selected. + clearFinalizedConnectionWorker(); // Try to create or find another existing stream to reuse. ConnectionWorker createdOrExistingConnection = null; try { @@ -299,7 +307,6 @@ private ConnectionWorker createOrReuseConnectionWorker( } return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); } else { - // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { return existingConnectionWorker; @@ -310,6 +317,18 @@ private ConnectionWorker createOrReuseConnectionWorker( } } + private void clearFinalizedConnectionWorker() { + Set connectionWorkerSet = new HashSet<>(); + for (ConnectionWorker existingWorker : connectionWorkerPool) { + if (existingWorker.isConnectionInUnrecoverableState()) { + connectionWorkerSet.add(existingWorker); + } + } + for (ConnectionWorker workerToRemove : connectionWorkerSet) { + connectionWorkerPool.remove(workerToRemove); + } + } + /** Select out the best connection worker among the given connection workers. */ static ConnectionWorker pickBestLoadConnection( Comparator comparator, List connectionWorkerList) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index edc7240ad7..337ff86a66 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -450,6 +451,15 @@ static void cleanUp() { connectionPoolMap.clear(); } + @VisibleForTesting + ConnectionWorkerPool getTestOnlyConnectionWorkerPool() { + ConnectionWorkerPool connectionWorkerPool = null; + for (Entry entry : connectionPoolMap.entrySet()) { + connectionWorkerPool = entry.getValue(); + } + return connectionWorkerPool; + } + /** A builder of {@link StreamWriter}s. */ public static final class Builder { private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index eacfdcb40f..6426bc6ca1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -69,6 +69,7 @@ public class StreamWriterTest { private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default"; private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; + private static final String TEST_STREAM_3 = "projects/p/datasets/d3/tables/t3/streams/_default"; private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default"; private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; @@ -1090,6 +1091,115 @@ public void testExtractDatasetName() throws Exception { Assert.assertTrue(ex.getMessage().contains("The passed in stream name does not match")); } + @Test + public void testRetryInUnrecoverableStatus_MultiplexingCase() throws Exception { + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build()); + ConnectionWorkerPool.enableTestingLogic(); + + // Setup: create three stream writers, two of them are writing to the same stream. + // Those four stream writers should be assigned to the same connection. + // 1. Submit three requests at first to trigger connection retry limitation. + // 2. At this point the connection should be entering a unrecoverable state. + // 3. Further submit requests to those stream writers would trigger connection reassignment. + StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1); + StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2); + StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3); + StreamWriter writer4 = getMultiplexingStreamWriter(TEST_STREAM_3); + + testBigQueryWrite.setCloseForeverAfter(2); + testBigQueryWrite.setTimesToClose(1); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + // Connection will be failed after triggering the third append. + ApiFuture appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0); + ApiFuture appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1); + ApiFuture appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertThrows( + ExecutionException.class, + () -> { + assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue()); + }); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1); + + // Insert another request to the writer attached to closed connection would create another + // connection. + + testBigQueryWrite.setCloseForeverAfter(0); + testBigQueryWrite.addResponse(createAppendResponse(4)); + testBigQueryWrite.addResponse(createAppendResponse(5)); + testBigQueryWrite.addResponse(createAppendResponse(6)); + ApiFuture appendFuture4 = sendTestMessage(writer4, new String[] {"A"}, 2); + ApiFuture appendFuture5 = sendTestMessage(writer1, new String[] {"A"}, 3); + ApiFuture appendFuture6 = sendTestMessage(writer2, new String[] {"B"}, 4); + assertEquals(4, appendFuture4.get().getAppendResult().getOffset().getValue()); + assertEquals(5, appendFuture5.get().getAppendResult().getOffset().getValue()); + assertEquals(6, appendFuture6.get().getAppendResult().getOffset().getValue()); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 2); + + writer1.close(); + writer2.close(); + writer3.close(); + writer4.close(); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 0); + } + + @Test + public void testCloseWhileInUnrecoverableState() throws Exception { + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build()); + ConnectionWorkerPool.enableTestingLogic(); + + // Setup: create three stream writers + // 1. Submit three requests at first to trigger connection retry limitation. + // 2. Submit request to writer3 to trigger reassignment + // 3. Close the previous two writers would be succesful + StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1); + StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2); + StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3); + + testBigQueryWrite.setCloseForeverAfter(2); + testBigQueryWrite.setTimesToClose(1); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + // Connection will be failed after triggering the third append. + ApiFuture appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0); + ApiFuture appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1); + ApiFuture appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertThrows( + ExecutionException.class, + () -> { + assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue()); + }); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1); + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1); + + writer1.close(); + writer2.close(); + // We will still be left with one request + assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1); + } + + public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOException { + return StreamWriter.newBuilder(streamName, client) + .setWriterSchema(createProtoSchema()) + .setEnableConnectionPool(true) + .setMaxInflightRequests(10) + .setLocation("US") + .setMaxRetryDuration(java.time.Duration.ofMillis(100)) + .build(); + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception {