diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index c9c27fae9e..4d07dfdd91 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -47,7 +47,10 @@ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); private static String datasetsMatching = "projects/[^/]+/datasets/[^/]+/"; - private static Pattern streamPattern = Pattern.compile(datasetsMatching); + private static Pattern streamPatternDatasets = Pattern.compile(datasetsMatching); + + private static String defaultStreamMatching = "/_default"; + private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching); // Cache of location info for a given dataset. private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>(); @@ -195,6 +198,14 @@ private StreamWriter(Builder builder) throws IOException { getBigQueryWriteClient(builder), ownsBigQueryWriteClient)); } else { + if (!isDefaultStream(streamName)) { + log.warning( + "Connection pool is only allowed in default stream! However received " + + builder.streamName); + throw new IllegalArgumentException( + "Trying to enable connection pool in non-default stream."); + } + BigQueryWriteClient client = getBigQueryWriteClient(builder); String location = builder.location; if (location == null || location.isEmpty()) { @@ -264,7 +275,7 @@ private StreamWriter(Builder builder) throws IOException { @VisibleForTesting static String extractDatasetAndProjectName(String streamName) { - Matcher streamMatcher = streamPattern.matcher(streamName); + Matcher streamMatcher = streamPatternDatasets.matcher(streamName); if (streamMatcher.find()) { return streamMatcher.group(); } else { @@ -273,6 +284,12 @@ static String extractDatasetAndProjectName(String streamName) { } } + @VisibleForTesting + static boolean isDefaultStream(String streamName) { + Matcher streamMatcher = streamPatternDefaultStream.matcher(streamName); + return streamMatcher.find(); + } + private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException { if (builder.client == null) { BigQueryWriteSettings stubSettings = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 258a287a1c..8c34ad9b3c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -63,8 +63,8 @@ @RunWith(JUnit4.class) public class JsonStreamWriterTest { private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); - private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; - private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 134b438593..50e43fe45d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -62,8 +62,10 @@ @RunWith(JUnit4.class) public class StreamWriterTest { private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); - private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1"; - private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; + private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default"; + private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -366,6 +368,31 @@ public void run() throws Throwable { }); } + @Test + public void testEnableConnectionPoolOnExplicitStream() throws Exception { + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter.newBuilder(EXPLICIT_STEAM, client) + .setEnableConnectionPool(true) + .build(); + } + }); + assertTrue(ex.getMessage().contains("Trying to enable connection pool in non-default stream.")); + } + + @Test + public void testShortenStreamNameAllowed() throws Exception { + // no exception is thrown. + StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriter writer = getTestStreamWriter();