From bd895567fe33735294065d7043d845f14f33f8a8 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Mon, 12 Dec 2022 10:02:52 -0800 Subject: [PATCH] feat: throw error when using connection pool for explicit stream (#1903) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream Co-authored-by: Owl Bot --- .../bigquery/storage/v1/StreamWriter.java | 21 +++++++++++-- .../storage/v1/JsonStreamWriterTest.java | 4 +-- .../bigquery/storage/v1/StreamWriterTest.java | 31 +++++++++++++++++-- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index c9c27fae9e..4d07dfdd91 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -47,7 +47,10 @@ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); private static String datasetsMatching = "projects/[^/]+/datasets/[^/]+/"; - private static Pattern streamPattern = Pattern.compile(datasetsMatching); + private static Pattern streamPatternDatasets = Pattern.compile(datasetsMatching); + + private static String defaultStreamMatching = "/_default"; + private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching); // Cache of location info for a given dataset. private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>(); @@ -195,6 +198,14 @@ private StreamWriter(Builder builder) throws IOException { getBigQueryWriteClient(builder), ownsBigQueryWriteClient)); } else { + if (!isDefaultStream(streamName)) { + log.warning( + "Connection pool is only allowed in default stream! However received " + + builder.streamName); + throw new IllegalArgumentException( + "Trying to enable connection pool in non-default stream."); + } + BigQueryWriteClient client = getBigQueryWriteClient(builder); String location = builder.location; if (location == null || location.isEmpty()) { @@ -264,7 +275,7 @@ private StreamWriter(Builder builder) throws IOException { @VisibleForTesting static String extractDatasetAndProjectName(String streamName) { - Matcher streamMatcher = streamPattern.matcher(streamName); + Matcher streamMatcher = streamPatternDatasets.matcher(streamName); if (streamMatcher.find()) { return streamMatcher.group(); } else { @@ -273,6 +284,12 @@ static String extractDatasetAndProjectName(String streamName) { } } + @VisibleForTesting + static boolean isDefaultStream(String streamName) { + Matcher streamMatcher = streamPatternDefaultStream.matcher(streamName); + return streamMatcher.find(); + } + private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException { if (builder.client == null) { BigQueryWriteSettings stubSettings = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 258a287a1c..8c34ad9b3c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -63,8 +63,8 @@ @RunWith(JUnit4.class) public class JsonStreamWriterTest { private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); - private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; - private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 134b438593..50e43fe45d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -62,8 +62,10 @@ @RunWith(JUnit4.class) public class StreamWriterTest { private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); - private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1"; - private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; + private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default"; + private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -366,6 +368,31 @@ public void run() throws Throwable { }); } + @Test + public void testEnableConnectionPoolOnExplicitStream() throws Exception { + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter.newBuilder(EXPLICIT_STEAM, client) + .setEnableConnectionPool(true) + .build(); + } + }); + assertTrue(ex.getMessage().contains("Trying to enable connection pool in non-default stream.")); + } + + @Test + public void testShortenStreamNameAllowed() throws Exception { + // no exception is thrown. + StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriter writer = getTestStreamWriter();