Skip to content

Commit

Permalink
feat: throw error when using connection pool for explicit stream (#1903)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Dec 12, 2022
1 parent 3c26596 commit bd89556
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
Expand Up @@ -47,7 +47,10 @@ public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

private static String datasetsMatching = "projects/[^/]+/datasets/[^/]+/";
private static Pattern streamPattern = Pattern.compile(datasetsMatching);
private static Pattern streamPatternDatasets = Pattern.compile(datasetsMatching);

private static String defaultStreamMatching = "/_default";
private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching);

// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -195,6 +198,14 @@ private StreamWriter(Builder builder) throws IOException {
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient));
} else {
if (!isDefaultStream(streamName)) {
log.warning(
"Connection pool is only allowed in default stream! However received "
+ builder.streamName);
throw new IllegalArgumentException(
"Trying to enable connection pool in non-default stream.");
}

BigQueryWriteClient client = getBigQueryWriteClient(builder);
String location = builder.location;
if (location == null || location.isEmpty()) {
Expand Down Expand Up @@ -264,7 +275,7 @@ private StreamWriter(Builder builder) throws IOException {

@VisibleForTesting
static String extractDatasetAndProjectName(String streamName) {
Matcher streamMatcher = streamPattern.matcher(streamName);
Matcher streamMatcher = streamPatternDatasets.matcher(streamName);
if (streamMatcher.find()) {
return streamMatcher.group();
} else {
Expand All @@ -273,6 +284,12 @@ static String extractDatasetAndProjectName(String streamName) {
}
}

@VisibleForTesting
static boolean isDefaultStream(String streamName) {
Matcher streamMatcher = streamPatternDefaultStream.matcher(streamName);
return streamMatcher.find();
}

private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException {
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
Expand Down
Expand Up @@ -63,8 +63,8 @@
@RunWith(JUnit4.class)
public class JsonStreamWriterTest {
private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2";
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Expand Down
Expand Up @@ -62,8 +62,10 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2";
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default";
private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -366,6 +368,31 @@ public void run() throws Throwable {
});
}

@Test
public void testEnableConnectionPoolOnExplicitStream() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(EXPLICIT_STEAM, client)
.setEnableConnectionPool(true)
.build();
}
});
assertTrue(ex.getMessage().contains("Trying to enable connection pool in non-default stream."));
}

@Test
public void testShortenStreamNameAllowed() throws Exception {
// no exception is thrown.
StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down

0 comments on commit bd89556

Please sign in to comment.